diff options
Diffstat (limited to 'benchmark/agbenchmark/agent_protocol_client/api_client.py')
-rw-r--r-- | benchmark/agbenchmark/agent_protocol_client/api_client.py | 838 |
1 files changed, 0 insertions, 838 deletions
diff --git a/benchmark/agbenchmark/agent_protocol_client/api_client.py b/benchmark/agbenchmark/agent_protocol_client/api_client.py deleted file mode 100644 index ce178a96b..000000000 --- a/benchmark/agbenchmark/agent_protocol_client/api_client.py +++ /dev/null @@ -1,838 +0,0 @@ -# coding: utf-8 - -""" - Agent Communication Protocol - - Specification of the API protocol for communication with an agent. # noqa: E501 - - The version of the OpenAPI document: v0.2 - Generated by OpenAPI Generator (https://openapi-generator.tech) - - Do not edit the class manually. -""" - - -import atexit -import datetime -import json -import mimetypes -import os -import re -import tempfile -from multiprocessing.pool import ThreadPool -from urllib.parse import quote - -from dateutil.parser import parse - -import agbenchmark.agent_protocol_client.models -from agbenchmark.agent_protocol_client import rest -from agbenchmark.agent_protocol_client.api_response import ApiResponse -from agbenchmark.agent_protocol_client.configuration import Configuration -from agbenchmark.agent_protocol_client.exceptions import ApiException, ApiValueError - - -class ApiClient(object): - """Generic API client for OpenAPI client library builds. - - OpenAPI generic API client. This client handles the client- - server communication, and is invariant across implementations. Specifics of - the methods and models for each application are generated from the OpenAPI - templates. - - :param configuration: .Configuration object for this client - :param header_name: a header to pass when making calls to the API. - :param header_value: a header value to pass when making calls to - the API. - :param cookie: a cookie to include in the header when making calls - to the API - :param pool_threads: The number of threads to use for async requests - to the API. More threads means more concurrent API requests. - """ - - PRIMITIVE_TYPES = (float, bool, bytes, str, int) - NATIVE_TYPES_MAPPING = { - "int": int, - "long": int, # TODO remove as only py3 is supported? - "float": float, - "str": str, - "bool": bool, - "date": datetime.date, - "datetime": datetime.datetime, - "object": object, - } - _pool = None - - def __init__( - self, - configuration=None, - header_name=None, - header_value=None, - cookie=None, - pool_threads=1, - ): - # use default configuration if none is provided - if configuration is None: - configuration = Configuration.get_default() - self.configuration = configuration - self.pool_threads = pool_threads - - self.rest_client = rest.RESTClientObject(configuration) - self.default_headers = {} - if header_name is not None: - self.default_headers[header_name] = header_value - self.cookie = cookie - # Set default User-Agent. - self.user_agent = "OpenAPI-Generator/1.0.0/python" - self.client_side_validation = configuration.client_side_validation - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - await self.close() - - async def close(self): - await self.rest_client.close() - if self._pool: - self._pool.close() - self._pool.join() - self._pool = None - if hasattr(atexit, "unregister"): - atexit.unregister(self.close) - - @property - def pool(self): - """Create thread pool on first request - avoids instantiating unused threadpool for blocking clients. - """ - if self._pool is None: - atexit.register(self.close) - self._pool = ThreadPool(self.pool_threads) - return self._pool - - @property - def user_agent(self): - """User agent for this API client""" - return self.default_headers["User-Agent"] - - @user_agent.setter - def user_agent(self, value): - self.default_headers["User-Agent"] = value - - def set_default_header(self, header_name, header_value): - self.default_headers[header_name] = header_value - - _default = None - - @classmethod - def get_default(cls): - """Return new instance of ApiClient. - - This method returns newly created, based on default constructor, - object of ApiClient class or returns a copy of default - ApiClient. - - :return: The ApiClient object. - """ - if cls._default is None: - cls._default = ApiClient() - return cls._default - - @classmethod - def set_default(cls, default): - """Set default instance of ApiClient. - - It stores default ApiClient. - - :param default: object of ApiClient. - """ - cls._default = default - - async def __call_api( - self, - resource_path, - method, - path_params=None, - query_params=None, - header_params=None, - body=None, - post_params=None, - files=None, - response_types_map=None, - auth_settings=None, - _return_http_data_only=None, - collection_formats=None, - _preload_content=True, - _request_timeout=None, - _host=None, - _request_auth=None, - ): - config = self.configuration - - # header parameters - header_params = header_params or {} - header_params.update(self.default_headers) - if self.cookie: - header_params["Cookie"] = self.cookie - if header_params: - header_params = self.sanitize_for_serialization(header_params) - header_params = dict( - self.parameters_to_tuples(header_params, collection_formats) - ) - - # path parameters - if path_params: - path_params = self.sanitize_for_serialization(path_params) - path_params = self.parameters_to_tuples(path_params, collection_formats) - for k, v in path_params: - # specified safe chars, encode everything - resource_path = resource_path.replace( - "{%s}" % k, quote(str(v), safe=config.safe_chars_for_path_param) - ) - - # post parameters - if post_params or files: - post_params = post_params if post_params else [] - post_params = self.sanitize_for_serialization(post_params) - post_params = self.parameters_to_tuples(post_params, collection_formats) - post_params.extend(self.files_parameters(files)) - - # auth setting - self.update_params_for_auth( - header_params, - query_params, - auth_settings, - resource_path, - method, - body, - request_auth=_request_auth, - ) - - # body - if body: - body = self.sanitize_for_serialization(body) - - # request url - if _host is None: - url = self.configuration.host + resource_path - else: - # use server/host defined in path or operation instead - url = _host + resource_path - - # query parameters - if query_params: - query_params = self.sanitize_for_serialization(query_params) - url_query = self.parameters_to_url_query(query_params, collection_formats) - url += "?" + url_query - - try: - # perform request and return response - response_data = await self.request( - method, - url, - query_params=query_params, - headers=header_params, - post_params=post_params, - body=body, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - ) - except ApiException as e: - if e.body: - e.body = e.body.decode("utf-8") - raise e - - self.last_response = response_data - - return_data = None # assuming derialization is not needed - # data needs deserialization or returns HTTP data (deserialized) only - if _preload_content or _return_http_data_only: - response_type = response_types_map.get(str(response_data.status), None) - - if response_type == "bytearray": - response_data.data = response_data.data - else: - match = None - content_type = response_data.getheader("content-type") - if content_type is not None: - match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type) - encoding = match.group(1) if match else "utf-8" - response_data.data = response_data.data.decode(encoding) - - # deserialize response data - if response_type == "bytearray": - return_data = response_data.data - elif response_type: - return_data = self.deserialize(response_data, response_type) - else: - return_data = None - - if _return_http_data_only: - return return_data - else: - return ApiResponse( - status_code=response_data.status, - data=return_data, - headers=response_data.getheaders(), - raw_data=response_data.data, - ) - - def sanitize_for_serialization(self, obj): - """Builds a JSON POST object. - - If obj is None, return None. - If obj is str, int, long, float, bool, return directly. - If obj is datetime.datetime, datetime.date - convert to string in iso8601 format. - If obj is list, sanitize each element in the list. - If obj is dict, return the dict. - If obj is OpenAPI model, return the properties dict. - - :param obj: The data to serialize. - :return: The serialized form of data. - """ - if obj is None: - return None - elif isinstance(obj, self.PRIMITIVE_TYPES): - return obj - elif isinstance(obj, list): - return [self.sanitize_for_serialization(sub_obj) for sub_obj in obj] - elif isinstance(obj, tuple): - return tuple(self.sanitize_for_serialization(sub_obj) for sub_obj in obj) - elif isinstance(obj, (datetime.datetime, datetime.date)): - return obj.isoformat() - - if isinstance(obj, dict): - obj_dict = obj - else: - # Convert model obj to dict except - # attributes `openapi_types`, `attribute_map` - # and attributes which value is not None. - # Convert attribute name to json key in - # model definition for request. - obj_dict = obj.to_dict() - - return { - key: self.sanitize_for_serialization(val) for key, val in obj_dict.items() - } - - def deserialize(self, response, response_type): - """Deserializes response into an object. - - :param response: RESTResponse object to be deserialized. - :param response_type: class literal for - deserialized object, or string of class name. - - :return: deserialized object. - """ - # handle file downloading - # save response body into a tmp file and return the instance - if response_type == "file": - return self.__deserialize_file(response) - - # fetch data from response object - try: - data = json.loads(response.data) - except ValueError: - data = response.data - - return self.__deserialize(data, response_type) - - def __deserialize(self, data, klass): - """Deserializes dict, list, str into an object. - - :param data: dict, list or str. - :param klass: class literal, or string of class name. - - :return: object. - """ - if data is None: - return None - - if type(klass) == str: - if klass.startswith("List["): - sub_kls = re.match(r"List\[(.*)]", klass).group(1) - return [self.__deserialize(sub_data, sub_kls) for sub_data in data] - - if klass.startswith("Dict["): - sub_kls = re.match(r"Dict\[([^,]*), (.*)]", klass).group(2) - return {k: self.__deserialize(v, sub_kls) for k, v in data.items()} - - # convert str to class - if klass in self.NATIVE_TYPES_MAPPING: - klass = self.NATIVE_TYPES_MAPPING[klass] - else: - klass = getattr(agbenchmark.agent_protocol_client.models, klass) - - if klass in self.PRIMITIVE_TYPES: - return self.__deserialize_primitive(data, klass) - elif klass == object: - return self.__deserialize_object(data) - elif klass == datetime.date: - return self.__deserialize_date(data) - elif klass == datetime.datetime: - return self.__deserialize_datetime(data) - else: - return self.__deserialize_model(data, klass) - - def call_api( - self, - resource_path, - method, - path_params=None, - query_params=None, - header_params=None, - body=None, - post_params=None, - files=None, - response_types_map=None, - auth_settings=None, - async_req=None, - _return_http_data_only=None, - collection_formats=None, - _preload_content=True, - _request_timeout=None, - _host=None, - _request_auth=None, - ): - """Makes the HTTP request (synchronous) and returns deserialized data. - - To make an async_req request, set the async_req parameter. - - :param resource_path: Path to method endpoint. - :param method: Method to call. - :param path_params: Path parameters in the url. - :param query_params: Query parameters in the url. - :param header_params: Header parameters to be - placed in the request header. - :param body: Request body. - :param post_params dict: Request post form parameters, - for `application/x-www-form-urlencoded`, `multipart/form-data`. - :param auth_settings list: Auth Settings names for the request. - :param response: Response data type. - :param files dict: key -> filename, value -> filepath, - for `multipart/form-data`. - :param async_req bool: execute request asynchronously - :param _return_http_data_only: response data instead of ApiResponse - object with status code, headers, etc - :param _preload_content: if False, the ApiResponse.data will - be set to none and raw_data will store the - HTTP response body without reading/decoding. - Default is True. - :param collection_formats: dict of collection formats for path, query, - header, and post parameters. - :param _request_timeout: timeout setting for this request. If one - number provided, it will be total request - timeout. It can also be a pair (tuple) of - (connection, read) timeouts. - :param _request_auth: set to override the auth_settings for an a single - request; this effectively ignores the authentication - in the spec for a single request. - :type _request_token: dict, optional - :return: - If async_req parameter is True, - the request will be called asynchronously. - The method will return the request thread. - If parameter async_req is False or missing, - then the method will return the response directly. - """ - if not async_req: - return self.__call_api( - resource_path, - method, - path_params, - query_params, - header_params, - body, - post_params, - files, - response_types_map, - auth_settings, - _return_http_data_only, - collection_formats, - _preload_content, - _request_timeout, - _host, - _request_auth, - ) - - return self.pool.apply_async( - self.__call_api, - ( - resource_path, - method, - path_params, - query_params, - header_params, - body, - post_params, - files, - response_types_map, - auth_settings, - _return_http_data_only, - collection_formats, - _preload_content, - _request_timeout, - _host, - _request_auth, - ), - ) - - def request( - self, - method, - url, - query_params=None, - headers=None, - post_params=None, - body=None, - _preload_content=True, - _request_timeout=None, - ): - """Makes the HTTP request using RESTClient.""" - if method == "GET": - return self.rest_client.get_request( - url, - query_params=query_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - headers=headers, - ) - elif method == "HEAD": - return self.rest_client.head_request( - url, - query_params=query_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - headers=headers, - ) - elif method == "OPTIONS": - return self.rest_client.options_request( - url, - query_params=query_params, - headers=headers, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - ) - elif method == "POST": - return self.rest_client.post_request( - url, - query_params=query_params, - headers=headers, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - ) - elif method == "PUT": - return self.rest_client.put_request( - url, - query_params=query_params, - headers=headers, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - ) - elif method == "PATCH": - return self.rest_client.patch_request( - url, - query_params=query_params, - headers=headers, - post_params=post_params, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - ) - elif method == "DELETE": - return self.rest_client.delete_request( - url, - query_params=query_params, - headers=headers, - _preload_content=_preload_content, - _request_timeout=_request_timeout, - body=body, - ) - else: - raise ApiValueError( - "http method must be `GET`, `HEAD`, `OPTIONS`," - " `POST`, `PATCH`, `PUT` or `DELETE`." - ) - - def parameters_to_tuples(self, params, collection_formats): - """Get parameters as list of tuples, formatting collections. - - :param params: Parameters as dict or list of two-tuples - :param dict collection_formats: Parameter collection formats - :return: Parameters as list of tuples, collections formatted - """ - new_params = [] - if collection_formats is None: - collection_formats = {} - for k, v in ( - params.items() if isinstance(params, dict) else params - ): # noqa: E501 - if k in collection_formats: - collection_format = collection_formats[k] - if collection_format == "multi": - new_params.extend((k, value) for value in v) - else: - if collection_format == "ssv": - delimiter = " " - elif collection_format == "tsv": - delimiter = "\t" - elif collection_format == "pipes": - delimiter = "|" - else: # csv is the default - delimiter = "," - new_params.append((k, delimiter.join(str(value) for value in v))) - else: - new_params.append((k, v)) - return new_params - - def parameters_to_url_query(self, params, collection_formats): - """Get parameters as list of tuples, formatting collections. - - :param params: Parameters as dict or list of two-tuples - :param dict collection_formats: Parameter collection formats - :return: URL query string (e.g. a=Hello%20World&b=123) - """ - new_params = [] - if collection_formats is None: - collection_formats = {} - for k, v in ( - params.items() if isinstance(params, dict) else params - ): # noqa: E501 - if isinstance(v, (int, float)): - v = str(v) - if isinstance(v, bool): - v = str(v).lower() - if isinstance(v, dict): - v = json.dumps(v) - - if k in collection_formats: - collection_format = collection_formats[k] - if collection_format == "multi": - new_params.extend((k, value) for value in v) - else: - if collection_format == "ssv": - delimiter = " " - elif collection_format == "tsv": - delimiter = "\t" - elif collection_format == "pipes": - delimiter = "|" - else: # csv is the default - delimiter = "," - new_params.append( - (k, delimiter.join(quote(str(value)) for value in v)) - ) - else: - new_params.append((k, quote(str(v)))) - - return "&".join(["=".join(item) for item in new_params]) - - def files_parameters(self, files=None): - """Builds form parameters. - - :param files: File parameters. - :return: Form parameters with files. - """ - params = [] - - if files: - for k, v in files.items(): - if not v: - continue - file_names = v if type(v) is list else [v] - for n in file_names: - with open(n, "rb") as f: - filename = os.path.basename(f.name) - filedata = f.read() - mimetype = ( - mimetypes.guess_type(filename)[0] - or "application/octet-stream" - ) - params.append(tuple([k, tuple([filename, filedata, mimetype])])) - - return params - - def select_header_accept(self, accepts): - """Returns `Accept` based on an array of accepts provided. - - :param accepts: List of headers. - :return: Accept (e.g. application/json). - """ - if not accepts: - return - - for accept in accepts: - if re.search("json", accept, re.IGNORECASE): - return accept - - return accepts[0] - - def select_header_content_type(self, content_types): - """Returns `Content-Type` based on an array of content_types provided. - - :param content_types: List of content-types. - :return: Content-Type (e.g. application/json). - """ - if not content_types: - return None - - for content_type in content_types: - if re.search("json", content_type, re.IGNORECASE): - return content_type - - return content_types[0] - - def update_params_for_auth( - self, - headers, - queries, - auth_settings, - resource_path, - method, - body, - request_auth=None, - ): - """Updates header and query params based on authentication setting. - - :param headers: Header parameters dict to be updated. - :param queries: Query parameters tuple list to be updated. - :param auth_settings: Authentication setting identifiers list. - :resource_path: A string representation of the HTTP request resource path. - :method: A string representation of the HTTP request method. - :body: A object representing the body of the HTTP request. - The object type is the return value of sanitize_for_serialization(). - :param request_auth: if set, the provided settings will - override the token in the configuration. - """ - if not auth_settings: - return - - if request_auth: - self._apply_auth_params( - headers, queries, resource_path, method, body, request_auth - ) - return - - for auth in auth_settings: - auth_setting = self.configuration.auth_settings().get(auth) - if auth_setting: - self._apply_auth_params( - headers, queries, resource_path, method, body, auth_setting - ) - - def _apply_auth_params( - self, headers, queries, resource_path, method, body, auth_setting - ): - """Updates the request parameters based on a single auth_setting - - :param headers: Header parameters dict to be updated. - :param queries: Query parameters tuple list to be updated. - :resource_path: A string representation of the HTTP request resource path. - :method: A string representation of the HTTP request method. - :body: A object representing the body of the HTTP request. - The object type is the return value of sanitize_for_serialization(). - :param auth_setting: auth settings for the endpoint - """ - if auth_setting["in"] == "cookie": - headers["Cookie"] = auth_setting["value"] - elif auth_setting["in"] == "header": - if auth_setting["type"] != "http-signature": - headers[auth_setting["key"]] = auth_setting["value"] - elif auth_setting["in"] == "query": - queries.append((auth_setting["key"], auth_setting["value"])) - else: - raise ApiValueError("Authentication token must be in `query` or `header`") - - def __deserialize_file(self, response): - """Deserializes body to file - - Saves response body into a file in a temporary folder, - using the filename from the `Content-Disposition` header if provided. - - :param response: RESTResponse. - :return: file path. - """ - fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path) - os.close(fd) - os.remove(path) - - content_disposition = response.getheader("Content-Disposition") - if content_disposition: - filename = re.search( - r'filename=[\'"]?([^\'"\s]+)[\'"]?', content_disposition - ).group(1) - path = os.path.join(os.path.dirname(path), filename) - - with open(path, "wb") as f: - f.write(response.data) - - return path - - def __deserialize_primitive(self, data, klass): - """Deserializes string to primitive type. - - :param data: str. - :param klass: class literal. - - :return: int, long, float, str, bool. - """ - try: - return klass(data) - except UnicodeEncodeError: - return str(data) - except TypeError: - return data - - def __deserialize_object(self, value): - """Return an original value. - - :return: object. - """ - return value - - def __deserialize_date(self, string): - """Deserializes string to date. - - :param string: str. - :return: date. - """ - try: - return parse(string).date() - except ImportError: - return string - except ValueError: - raise rest.ApiException( - status=0, reason="Failed to parse `{0}` as date object".format(string) - ) - - def __deserialize_datetime(self, string): - """Deserializes string to datetime. - - The string should be in iso8601 datetime format. - - :param string: str. - :return: datetime. - """ - try: - return parse(string) - except ImportError: - return string - except ValueError: - raise rest.ApiException( - status=0, - reason=("Failed to parse `{0}` as datetime object".format(string)), - ) - - def __deserialize_model(self, data, klass): - """Deserializes list or dict to model. - - :param data: dict, list. - :param klass: class literal. - :return: model object. - """ - - return klass.from_dict(data) |