-# coding: utf-8
- Agent Communication Protocol
- Specification of the API protocol for communication with an agent. # noqa: E501
- The version of the OpenAPI document: v0.2
- Generated by OpenAPI Generator (https://openapi-generator.tech)
- Do not edit the class manually.
-import atexit
-import datetime
-import json
-import mimetypes
-import os
-import re
-import tempfile
-from multiprocessing.pool import ThreadPool
-from urllib.parse import quote
-from dateutil.parser import parse
-import agbenchmark.agent_protocol_client.models
-from agbenchmark.agent_protocol_client import rest
-from agbenchmark.agent_protocol_client.api_response import ApiResponse
-from agbenchmark.agent_protocol_client.configuration import Configuration
-from agbenchmark.agent_protocol_client.exceptions import ApiException, ApiValueError
-class ApiClient(object):
- """Generic API client for OpenAPI client library builds.
- OpenAPI generic API client. This client handles the client-
- server communication, and is invariant across implementations. Specifics of
- the methods and models for each application are generated from the OpenAPI
- templates.
- :param configuration: .Configuration object for this client
- :param header_name: a header to pass when making calls to the API.
- :param header_value: a header value to pass when making calls to
- the API.
- :param cookie: a cookie to include in the header when making calls
- to the API
- :param pool_threads: The number of threads to use for async requests
- to the API. More threads means more concurrent API requests.
- """
- PRIMITIVE_TYPES = (float, bool, bytes, str, int)
- "int": int,
- "long": int, # TODO remove as only py3 is supported?
- "float": float,
- "str": str,
- "bool": bool,
- "date": datetime.date,
- "datetime": datetime.datetime,
- "object": object,
- }
- _pool = None
- def __init__(
- self,
- configuration=None,
- header_name=None,
- header_value=None,
- cookie=None,
- pool_threads=1,
- ):
- # use default configuration if none is provided
- if configuration is None:
- configuration = Configuration.get_default()
- self.configuration = configuration
- self.pool_threads = pool_threads
- self.rest_client = rest.RESTClientObject(configuration)
- self.default_headers = {}
- if header_name is not None:
- self.default_headers[header_name] = header_value
- self.cookie = cookie
- # Set default User-Agent.
- self.user_agent = "OpenAPI-Generator/1.0.0/python"
- self.client_side_validation = configuration.client_side_validation
- async def __aenter__(self):
- return self
- async def __aexit__(self, exc_type, exc_value, traceback):
- await self.close()
- async def close(self):
- await self.rest_client.close()
- if self._pool:
- self._pool.close()
- self._pool.join()
- self._pool = None
- if hasattr(atexit, "unregister"):
- atexit.unregister(self.close)
- @property
- def pool(self):
- """Create thread pool on first request
- avoids instantiating unused threadpool for blocking clients.
- """
- if self._pool is None:
- atexit.register(self.close)
- self._pool = ThreadPool(self.pool_threads)
- return self._pool
- @property
- def user_agent(self):
- """User agent for this API client"""
- return self.default_headers["User-Agent"]
- @user_agent.setter
- def user_agent(self, value):
- self.default_headers["User-Agent"] = value
- def set_default_header(self, header_name, header_value):
- self.default_headers[header_name] = header_value
- _default = None
- @classmethod
- def get_default(cls):
- """Return new instance of ApiClient.
- This method returns newly created, based on default constructor,
- object of ApiClient class or returns a copy of default
- ApiClient.
- :return: The ApiClient object.
- """
- if cls._default is None:
- cls._default = ApiClient()
- return cls._default
- @classmethod
- def set_default(cls, default):
- """Set default instance of ApiClient.
- It stores default ApiClient.
- :param default: object of ApiClient.
- """
- cls._default = default
- async def __call_api(
- self,
- resource_path,
- method,
- path_params=None,
- query_params=None,
- header_params=None,
- body=None,
- post_params=None,
- files=None,
- response_types_map=None,
- auth_settings=None,
- _return_http_data_only=None,
- collection_formats=None,
- _preload_content=True,
- _request_timeout=None,
- _host=None,
- _request_auth=None,
- ):
- config = self.configuration
- # header parameters
- header_params = header_params or {}
- header_params.update(self.default_headers)
- if self.cookie:
- header_params["Cookie"] = self.cookie
- if header_params:
- header_params = self.sanitize_for_serialization(header_params)
- header_params = dict(
- self.parameters_to_tuples(header_params, collection_formats)
- )
- # path parameters
- if path_params:
- path_params = self.sanitize_for_serialization(path_params)
- path_params = self.parameters_to_tuples(path_params, collection_formats)
- for k, v in path_params:
- # specified safe chars, encode everything
- resource_path = resource_path.replace(
- "{%s}" % k, quote(str(v), safe=config.safe_chars_for_path_param)
- )
- # post parameters
- if post_params or files:
- post_params = post_params if post_params else []
- post_params = self.sanitize_for_serialization(post_params)
- post_params = self.parameters_to_tuples(post_params, collection_formats)
- post_params.extend(self.files_parameters(files))
- # auth setting
- self.update_params_for_auth(
- header_params,
- query_params,
- auth_settings,
- resource_path,
- method,
- body,
- request_auth=_request_auth,
- )
- # body
- if body:
- body = self.sanitize_for_serialization(body)
- # request url
- if _host is None:
- url = self.configuration.host + resource_path
- else:
- # use server/host defined in path or operation instead
- url = _host + resource_path
- # query parameters
- if query_params:
- query_params = self.sanitize_for_serialization(query_params)
- url_query = self.parameters_to_url_query(query_params, collection_formats)
- url += "?" + url_query
- try:
- # perform request and return response
- response_data = await self.request(
- method,
- url,
- query_params=query_params,
- headers=header_params,
- post_params=post_params,
- body=body,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- )
- except ApiException as e:
- if e.body:
- e.body = e.body.decode("utf-8")
- raise e
- self.last_response = response_data
- return_data = None # assuming derialization is not needed
- # data needs deserialization or returns HTTP data (deserialized) only
- if _preload_content or _return_http_data_only:
- response_type = response_types_map.get(str(response_data.status), None)
- if response_type == "bytearray":
- response_data.data = response_data.data
- else:
- match = None
- content_type = response_data.getheader("content-type")
- if content_type is not None:
- match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type)
- encoding = match.group(1) if match else "utf-8"
- response_data.data = response_data.data.decode(encoding)
- # deserialize response data
- if response_type == "bytearray":
- return_data = response_data.data
- elif response_type:
- return_data = self.deserialize(response_data, response_type)
- else:
- return_data = None
- if _return_http_data_only:
- return return_data
- else:
- return ApiResponse(
- status_code=response_data.status,
- data=return_data,
- headers=response_data.getheaders(),
- raw_data=response_data.data,
- )
- def sanitize_for_serialization(self, obj):
- """Builds a JSON POST object.
- If obj is None, return None.
- If obj is str, int, long, float, bool, return directly.
- If obj is datetime.datetime, datetime.date
- convert to string in iso8601 format.
- If obj is list, sanitize each element in the list.
- If obj is dict, return the dict.
- If obj is OpenAPI model, return the properties dict.
- :param obj: The data to serialize.
- :return: The serialized form of data.
- """
- if obj is None:
- return None
- elif isinstance(obj, self.PRIMITIVE_TYPES):
- return obj
- elif isinstance(obj, list):
- return [self.sanitize_for_serialization(sub_obj) for sub_obj in obj]
- elif isinstance(obj, tuple):
- return tuple(self.sanitize_for_serialization(sub_obj) for sub_obj in obj)
- elif isinstance(obj, (datetime.datetime, datetime.date)):
- return obj.isoformat()
- if isinstance(obj, dict):
- obj_dict = obj
- else:
- # Convert model obj to dict except
- # attributes `openapi_types`, `attribute_map`
- # and attributes which value is not None.
- # Convert attribute name to json key in
- # model definition for request.
- obj_dict = obj.to_dict()
- return {
- key: self.sanitize_for_serialization(val) for key, val in obj_dict.items()
- }
- def deserialize(self, response, response_type):
- """Deserializes response into an object.
- :param response: RESTResponse object to be deserialized.
- :param response_type: class literal for
- deserialized object, or string of class name.
- :return: deserialized object.
- """
- # handle file downloading
- # save response body into a tmp file and return the instance
- if response_type == "file":
- return self.__deserialize_file(response)
- # fetch data from response object
- try:
- data = json.loads(response.data)
- except ValueError:
- data = response.data
- return self.__deserialize(data, response_type)
- def __deserialize(self, data, klass):
- """Deserializes dict, list, str into an object.
- :param data: dict, list or str.
- :param klass: class literal, or string of class name.
- :return: object.
- """
- if data is None:
- return None
- if type(klass) == str:
- if klass.startswith("List["):
- sub_kls = re.match(r"List\[(.*)]", klass).group(1)
- return [self.__deserialize(sub_data, sub_kls) for sub_data in data]
- if klass.startswith("Dict["):
- sub_kls = re.match(r"Dict\[([^,]*), (.*)]", klass).group(2)
- return {k: self.__deserialize(v, sub_kls) for k, v in data.items()}
- # convert str to class
- if klass in self.NATIVE_TYPES_MAPPING:
- klass = self.NATIVE_TYPES_MAPPING[klass]
- else:
- klass = getattr(agbenchmark.agent_protocol_client.models, klass)
- if klass in self.PRIMITIVE_TYPES:
- return self.__deserialize_primitive(data, klass)
- elif klass == object:
- return self.__deserialize_object(data)
- elif klass == datetime.date:
- return self.__deserialize_date(data)
- elif klass == datetime.datetime:
- return self.__deserialize_datetime(data)
- else:
- return self.__deserialize_model(data, klass)
- def call_api(
- self,
- resource_path,
- method,
- path_params=None,
- query_params=None,
- header_params=None,
- body=None,
- post_params=None,
- files=None,
- response_types_map=None,
- auth_settings=None,
- async_req=None,
- _return_http_data_only=None,
- collection_formats=None,
- _preload_content=True,
- _request_timeout=None,
- _host=None,
- _request_auth=None,
- ):
- """Makes the HTTP request (synchronous) and returns deserialized data.
- To make an async_req request, set the async_req parameter.
- :param resource_path: Path to method endpoint.
- :param method: Method to call.
- :param path_params: Path parameters in the url.
- :param query_params: Query parameters in the url.
- :param header_params: Header parameters to be
- placed in the request header.
- :param body: Request body.
- :param post_params dict: Request post form parameters,
- for `application/x-www-form-urlencoded`, `multipart/form-data`.
- :param auth_settings list: Auth Settings names for the request.
- :param response: Response data type.
- :param files dict: key -> filename, value -> filepath,
- for `multipart/form-data`.
- :param async_req bool: execute request asynchronously
- :param _return_http_data_only: response data instead of ApiResponse
- object with status code, headers, etc
- :param _preload_content: if False, the ApiResponse.data will
- be set to none and raw_data will store the
- HTTP response body without reading/decoding.
- Default is True.
- :param collection_formats: dict of collection formats for path, query,
- header, and post parameters.
- :param _request_timeout: timeout setting for this request. If one
- number provided, it will be total request
- timeout. It can also be a pair (tuple) of
- (connection, read) timeouts.
- :param _request_auth: set to override the auth_settings for an a single
- request; this effectively ignores the authentication
- in the spec for a single request.
- :type _request_token: dict, optional
- :return:
- If async_req parameter is True,
- the request will be called asynchronously.
- The method will return the request thread.
- If parameter async_req is False or missing,
- then the method will return the response directly.
- """
- if not async_req:
- return self.__call_api(
- resource_path,
- method,
- path_params,
- query_params,
- header_params,
- body,
- post_params,
- files,
- response_types_map,
- auth_settings,
- _return_http_data_only,
- collection_formats,
- _preload_content,
- _request_timeout,
- _host,
- _request_auth,
- )
- return self.pool.apply_async(
- self.__call_api,
- (
- resource_path,
- method,
- path_params,
- query_params,
- header_params,
- body,
- post_params,
- files,
- response_types_map,
- auth_settings,
- _return_http_data_only,
- collection_formats,
- _preload_content,
- _request_timeout,
- _host,
- _request_auth,
- ),
- )
- def request(
- self,
- method,
- url,
- query_params=None,
- headers=None,
- post_params=None,
- body=None,
- _preload_content=True,
- _request_timeout=None,
- ):
- """Makes the HTTP request using RESTClient."""
- if method == "GET":
- return self.rest_client.get_request(
- url,
- query_params=query_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- headers=headers,
- )
- elif method == "HEAD":
- return self.rest_client.head_request(
- url,
- query_params=query_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- headers=headers,
- )
- elif method == "OPTIONS":
- return self.rest_client.options_request(
- url,
- query_params=query_params,
- headers=headers,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- )
- elif method == "POST":
- return self.rest_client.post_request(
- url,
- query_params=query_params,
- headers=headers,
- post_params=post_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body,
- )
- elif method == "PUT":
- return self.rest_client.put_request(
- url,
- query_params=query_params,
- headers=headers,
- post_params=post_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body,
- )
- elif method == "PATCH":
- return self.rest_client.patch_request(
- url,
- query_params=query_params,
- headers=headers,
- post_params=post_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body,
- )
- elif method == "DELETE":
- return self.rest_client.delete_request(
- url,
- query_params=query_params,
- headers=headers,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body,
- )
- else:
- raise ApiValueError(
- "http method must be `GET`, `HEAD`, `OPTIONS`,"
- " `POST`, `PATCH`, `PUT` or `DELETE`."
- )
- def parameters_to_tuples(self, params, collection_formats):
- """Get parameters as list of tuples, formatting collections.
- :param params: Parameters as dict or list of two-tuples
- :param dict collection_formats: Parameter collection formats
- :return: Parameters as list of tuples, collections formatted
- """
- new_params = []
- if collection_formats is None:
- collection_formats = {}
- for k, v in (
- params.items() if isinstance(params, dict) else params
- ): # noqa: E501
- if k in collection_formats:
- collection_format = collection_formats[k]
- if collection_format == "multi":
- new_params.extend((k, value) for value in v)
- else:
- if collection_format == "ssv":
- delimiter = " "
- elif collection_format == "tsv":
- delimiter = "\t"
- elif collection_format == "pipes":
- delimiter = "|"
- else: # csv is the default
- delimiter = ","
- new_params.append((k, delimiter.join(str(value) for value in v)))
- else:
- new_params.append((k, v))
- return new_params
- def parameters_to_url_query(self, params, collection_formats):
- """Get parameters as list of tuples, formatting collections.
- :param params: Parameters as dict or list of two-tuples
- :param dict collection_formats: Parameter collection formats
- :return: URL query string (e.g. a=Hello%20World&b=123)
- """
- new_params = []
- if collection_formats is None:
- collection_formats = {}
- for k, v in (
- params.items() if isinstance(params, dict) else params
- ): # noqa: E501
- if isinstance(v, (int, float)):
- v = str(v)
- if isinstance(v, bool):
- v = str(v).lower()
- if isinstance(v, dict):
- v = json.dumps(v)
- if k in collection_formats:
- collection_format = collection_formats[k]
- if collection_format == "multi":
- new_params.extend((k, value) for value in v)
- else:
- if collection_format == "ssv":
- delimiter = " "
- elif collection_format == "tsv":
- delimiter = "\t"
- elif collection_format == "pipes":
- delimiter = "|"
- else: # csv is the default
- delimiter = ","
- new_params.append(
- (k, delimiter.join(quote(str(value)) for value in v))
- )
- else:
- new_params.append((k, quote(str(v))))
- return "&".join(["=".join(item) for item in new_params])
- def files_parameters(self, files=None):
- """Builds form parameters.
- :param files: File parameters.
- :return: Form parameters with files.
- """
- params = []
- if files:
- for k, v in files.items():
- if not v:
- continue
- file_names = v if type(v) is list else [v]
- for n in file_names:
- with open(n, "rb") as f:
- filename = os.path.basename(f.name)
- filedata = f.read()
- mimetype = (
- mimetypes.guess_type(filename)[0]
- or "application/octet-stream"
- )
- params.append(tuple([k, tuple([filename, filedata, mimetype])]))
- return params
- def select_header_accept(self, accepts):
- """Returns `Accept` based on an array of accepts provided.
- :param accepts: List of headers.
- :return: Accept (e.g. application/json).
- """
- if not accepts:
- return
- for accept in accepts:
- if re.search("json", accept, re.IGNORECASE):
- return accept
- return accepts[0]
- def select_header_content_type(self, content_types):
- """Returns `Content-Type` based on an array of content_types provided.
- :param content_types: List of content-types.
- :return: Content-Type (e.g. application/json).
- """
- if not content_types:
- return None
- for content_type in content_types:
- if re.search("json", content_type, re.IGNORECASE):
- return content_type
- return content_types[0]
- def update_params_for_auth(
- self,
- headers,
- queries,
- auth_settings,
- resource_path,
- method,
- body,
- request_auth=None,
- ):
- """Updates header and query params based on authentication setting.
- :param headers: Header parameters dict to be updated.
- :param queries: Query parameters tuple list to be updated.
- :param auth_settings: Authentication setting identifiers list.
- :resource_path: A string representation of the HTTP request resource path.
- :method: A string representation of the HTTP request method.
- :body: A object representing the body of the HTTP request.
- The object type is the return value of sanitize_for_serialization().
- :param request_auth: if set, the provided settings will
- override the token in the configuration.
- """
- if not auth_settings:
- return
- if request_auth:
- self._apply_auth_params(
- headers, queries, resource_path, method, body, request_auth
- )
- return
- for auth in auth_settings:
- auth_setting = self.configuration.auth_settings().get(auth)
- if auth_setting:
- self._apply_auth_params(
- headers, queries, resource_path, method, body, auth_setting
- )
- def _apply_auth_params(
- self, headers, queries, resource_path, method, body, auth_setting
- ):
- """Updates the request parameters based on a single auth_setting
- :param headers: Header parameters dict to be updated.
- :param queries: Query parameters tuple list to be updated.
- :resource_path: A string representation of the HTTP request resource path.
- :method: A string representation of the HTTP request method.
- :body: A object representing the body of the HTTP request.
- The object type is the return value of sanitize_for_serialization().
- :param auth_setting: auth settings for the endpoint
- """
- if auth_setting["in"] == "cookie":
- headers["Cookie"] = auth_setting["value"]
- elif auth_setting["in"] == "header":
- if auth_setting["type"] != "http-signature":
- headers[auth_setting["key"]] = auth_setting["value"]
- elif auth_setting["in"] == "query":
- queries.append((auth_setting["key"], auth_setting["value"]))
- else:
- raise ApiValueError("Authentication token must be in `query` or `header`")
- def __deserialize_file(self, response):
- """Deserializes body to file
- Saves response body into a file in a temporary folder,
- using the filename from the `Content-Disposition` header if provided.
- :param response: RESTResponse.
- :return: file path.
- """
- fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path)
- os.close(fd)
- os.remove(path)
- content_disposition = response.getheader("Content-Disposition")
- if content_disposition:
- filename = re.search(
- r'filename=[\'"]?([^\'"\s]+)[\'"]?', content_disposition
- ).group(1)
- path = os.path.join(os.path.dirname(path), filename)
- with open(path, "wb") as f:
- f.write(response.data)
- return path
- def __deserialize_primitive(self, data, klass):
- """Deserializes string to primitive type.
- :param data: str.
- :param klass: class literal.
- :return: int, long, float, str, bool.
- """
- try:
- return klass(data)
- except UnicodeEncodeError:
- return str(data)
- except TypeError:
- return data
- def __deserialize_object(self, value):
- """Return an original value.
- :return: object.
- """
- return value
- def __deserialize_date(self, string):
- """Deserializes string to date.
- :param string: str.
- :return: date.
- """
- try:
- return parse(string).date()
- except ImportError:
- return string
- except ValueError:
- raise rest.ApiException(
- status=0, reason="Failed to parse `{0}` as date object".format(string)
- )
- def __deserialize_datetime(self, string):
- """Deserializes string to datetime.
- The string should be in iso8601 datetime format.
- :param string: str.
- :return: datetime.
- """
- try:
- return parse(string)
- except ImportError:
- return string
- except ValueError:
- raise rest.ApiException(
- status=0,
- reason=("Failed to parse `{0}` as datetime object".format(string)),
- )
- def __deserialize_model(self, data, klass):
- """Deserializes list or dict to model.
- :param data: dict, list.
- :param klass: class literal.
- :return: model object.
- """
- return klass.from_dict(data)