Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
coverage==5.3
freezegun==1.0.0
pytest==6.1.2
pytest==7.1.2
pytest-httpx==0.20.0
pytest-mock==3.3.1
requests-mock==1.8.0
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
"pydantic >=1.8.2, <2.0.0",
"redis >=3.5.3, <4.0.0",
"requests >=2.25.0, <3.0.0",
"httpx >=0.22.0, <0.23.0",
"PyYAML >=6.0, <6.1",
],
python_requires='>=3.8, <4.0',
license='The Unlicense',
Expand Down
209 changes: 209 additions & 0 deletions src/corva/api_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import dataclasses
import functools
import json
import logging
import posixpath
from typing import Callable, List, Optional

import httpx
import yaml


def _httpx_headers_to_dict(headers: httpx.Headers) -> dict:
return json.loads(
repr(headers) # use built-in `repr` as it obfuscates sensitive headers
.strip("Headers()") # strip obsolete data
.replace(
"'", '"'
) # replace single quotes with double ones to get proper json string
)


def _failed_request_msg(
msg: str,
request: httpx.Request,
response: Optional[httpx.Response],
) -> str:
data = {"message": f"Request failed - {msg}"}

if response:
# log response first, so there is less chance it gets truncated
# and users are able to see server error message
data["response"] = {
"code": response.status_code,
"reason": response.reason_phrase,
"headers": _httpx_headers_to_dict(response.headers),
"content": str(response.content),
}

data["request"] = {
"method": request.method,
"url": str(request.url),
"headers": _httpx_headers_to_dict(request.headers),
"content": str(request.content),
}

# use yaml because it is much more readable in logs
return yaml.dump(data, sort_keys=False)


def logging_send(func: Callable, *, logger: logging.Logger) -> Callable:
@functools.wraps(func)
def wrapper(request: httpx.Request, *args, **kwargs):
try:
response = func(request, *args, **kwargs)
except httpx.HTTPError as exc:
# Response was not received at all
logger.error(
_failed_request_msg(msg=str(exc), request=request, response=None)
)
raise

try:
response.raise_for_status()
except httpx.HTTPStatusError as exc:
# Response has unsuccessful status
logger.error(
_failed_request_msg(msg=str(exc), request=request, response=response)
)

return response

return wrapper


class DataApiV1Sdk:
def __init__(self, client: httpx.Client):
self.http = client

def get(
self,
provider: str,
dataset: str,
*,
query: dict,
sort: dict,
limit: int,
skip: int = 0,
fields: Optional[str] = None,
) -> List[dict]:
"""Fetches data from the endpoint GET 'data/{provider}/{dataset}/'.

Args:
provider: company name owning the dataset.
dataset: dataset name.
query: search conditions. Example: {"asset_id": 123} - will fetch data
for asset with id 123.
sort: sort conditions. Example: {"timestamp": 1} - will sort data
in ascending order by timestamp.
limit: number of data points to fecth.
Recommendation for setting the limit:
1. The bigger ↑ each data point is - the smaller ↓ the limit;
2. The smaller ↓ each data point is - the bigger ↑ the limit.
skip: exclude from the response the first N items of the dataset.
Comment thread
ville marked this conversation as resolved.
Note: skip should only be used for small amounts of data, having a
large skip will lead to very slow queries.
fields: comma separated list of fields to return. Example: "_id,data".

Raises:
requests.HTTPError: if request was unsuccessful.

Returns:
Data from dataset.
"""

response = self.http.get(
url=f"data/{provider}/{dataset}/",
params={
"query": json.dumps(query),
"sort": json.dumps(sort),
"fields": fields,
"limit": limit,
"skip": skip,
},
)

response.raise_for_status()

data = list(response.json())

return data


class PlatformApiV1Sdk:
def __init__(self, client: httpx.Client):
self.http = client


class PlatformApiV2Sdk:
def __init__(self, client: httpx.Client):
self.http = client


@dataclasses.dataclass(frozen=True)
class PlatformApiVersions:
v1: PlatformApiV1Sdk
v2: PlatformApiV2Sdk


@dataclasses.dataclass(frozen=True)
class DataApiVersions:
v1: DataApiV1Sdk


class UserApiSdk:
def __init__(
self,
platform_api_url: str,
data_api_url: str,
api_key: str,
app_key: str,
logger: logging.Logger,
timeout: int = 30,
):
self._platform_api_url = platform_api_url
self._data_api_url = data_api_url
self._headers = {
"Authorization": f"API {api_key}",
"X-Corva-App": app_key,
}
self._logger = logger
self._timeout = timeout

def __enter__(self):
data_cli = httpx.Client(
base_url=posixpath.join(self._data_api_url, "api/v1"),
headers=self._headers,
timeout=self._timeout,
)
platform_v1_cli = httpx.Client(
base_url=posixpath.join(self._platform_api_url, "v1"),
headers=self._headers,
timeout=self._timeout,
)
platform_v2_cli = httpx.Client(
base_url=posixpath.join(self._platform_api_url, "v2"),
headers=self._headers,
timeout=self._timeout,
)

data_cli.send = logging_send(func=data_cli.send, logger=self._logger)
platform_v1_cli.send = logging_send(
func=platform_v1_cli.send, logger=self._logger
)
platform_v2_cli.send = logging_send(
func=platform_v2_cli.send, logger=self._logger
)

self.data = DataApiVersions(v1=DataApiV1Sdk(client=data_cli))
self.platform = PlatformApiVersions(
v1=PlatformApiV1Sdk(client=platform_v1_cli),
v2=PlatformApiV2Sdk(client=platform_v2_cli),
)

return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.data.v1.http.close()
self.platform.v1.http.close()
self.platform.v2.http.close()
94 changes: 94 additions & 0 deletions tests/integration/test_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import logging

import httpx
import pytest
import pytest_httpx
import yaml

import corva.api_adapter
import corva.configuration


class TestLogsFailedRequests:
def test_no_response(self, caplog: pytest.LogCaptureFixture):
caplog.handler.setFormatter(logging.Formatter('%(message)s'))

sdk = corva.api_adapter.UserApiSdk(
platform_api_url='',
data_api_url='',
api_key='',
app_key='',
logger=logging.getLogger(),
)

with sdk as s, pytest.raises(httpx.HTTPError):
s.data.v1.http.get(url='whatever')

actual = yaml.safe_load(caplog.text)

expected = {
'message': "Request failed - Request URL is missing an 'http://' or "
"'https://' protocol.",
'request': {
'method': 'GET',
'url': 'api/v1/whatever',
'headers': {
'accept': '*/*',
'accept-encoding': 'gzip, deflate',
'connection': 'keep-alive',
'user-agent': 'python-httpx/0.22.0',
'authorization': '[secure]',
'x-corva-app': '',
},
'content': "b''",
},
}

assert expected == actual

def test_unsuccessful_response(
self, caplog: pytest.LogCaptureFixture, httpx_mock: pytest_httpx.HTTPXMock
):
caplog.handler.setFormatter(logging.Formatter('%(message)s'))
httpx_mock.add_response(status_code=400)

sdk = corva.api_adapter.UserApiSdk(
platform_api_url='',
data_api_url="https://test_url",
api_key='',
app_key='',
logger=logging.getLogger(),
)

with sdk as s:
s.data.v1.http.get(url='')

actual = yaml.safe_load(caplog.text)

expected = {
'message': "Request failed - Client error '400 Bad Request' for url "
"'https://test_url/api/v1/'\nFor more information check: "
"https://httpstatuses.com/400",
'response': {
'code': 400,
'reason': 'Bad Request',
'headers': {},
'content': "b''",
},
'request': {
'method': 'GET',
'url': 'https://test_url/api/v1/',
'headers': {
'host': 'test_url',
'accept': '*/*',
'accept-encoding': 'gzip, deflate',
'connection': 'keep-alive',
'user-agent': 'python-httpx/0.22.0',
'authorization': '[secure]',
'x-corva-app': '',
},
'content': "b''",
},
}

assert actual == expected