From 27eed84a6a7c8ca8e6af6fb81d4b27892b05cee8 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Sat, 25 Jan 2025 14:36:18 +0400 Subject: [PATCH 01/25] add app logic for meaningful error when mismatched app type is used --- src/corva/handlers.py | 13 ++- src/corva/validate_app_init.py | 78 ++++++++++++++ tests/unit/test_apps_event_types.py | 114 +++++++++++++++++++++ tests/unit/test_partial_rerun_merge_app.py | 5 +- 4 files changed, 207 insertions(+), 3 deletions(-) create mode 100644 src/corva/validate_app_init.py create mode 100644 tests/unit/test_apps_event_types.py diff --git a/src/corva/handlers.py b/src/corva/handlers.py index e86d2cec..66a8701b 100644 --- a/src/corva/handlers.py +++ b/src/corva/handlers.py @@ -36,6 +36,7 @@ from corva.service import service from corva.service.api_sdk import CachingApiSdk, CorvaApiSdk from corva.service.cache_sdk import FakeInternalCacheSdk, InternalRedisSdk, UserRedisSdk +from corva.validate_app_init import validate_app_type_context StreamEventT = TypeVar("StreamEventT", bound=StreamEvent) ScheduledEventT = TypeVar("ScheduledEventT", bound=ScheduledEvent) @@ -60,6 +61,7 @@ def base_handler( func: Callable, raw_event_type: Type[RawBaseEvent], handler: Optional[logging.Handler], + app_decorator_type: str, merge_events: bool = False, ) -> Callable[[Any, Any], List[Any]]: @functools.wraps(func) @@ -89,6 +91,8 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: data_transformation_type = raw_custom_event_type or raw_event_type if merge_events: aws_event = _merge_events(aws_event, data_transformation_type) + + validate_app_type_context(aws_event, app_decorator_type) raw_events = data_transformation_type.from_raw_event(event=aws_event) if ( @@ -144,6 +148,7 @@ def stream( raw_event_type=RawStreamEvent, handler=handler, merge_events=merge_events, + app_decorator_type="stream", ) def wrapper( event: RawStreamEvent, @@ -253,6 +258,7 @@ def scheduled( raw_event_type=RawScheduledEvent, handler=handler, merge_events=merge_events, + app_decorator_type="scheduled", ) def wrapper( event: RawScheduledEvent, @@ -354,7 +360,12 @@ def task( return functools.partial(task, handler=handler) @functools.wraps(func) - @functools.partial(base_handler, raw_event_type=RawTaskEvent, handler=handler) + @functools.partial( + base_handler, + raw_event_type=RawTaskEvent, + handler=handler, + app_decorator_type="task", + ) def wrapper( event: RawTaskEvent, api_key: str, diff --git a/src/corva/validate_app_init.py b/src/corva/validate_app_init.py new file mode 100644 index 00000000..006d01e9 --- /dev/null +++ b/src/corva/validate_app_init.py @@ -0,0 +1,78 @@ +import contextlib +import functools +import json +import os +from typing import Any, Optional, Type, Dict + +import pydantic + +from corva.models.base import RawBaseEvent +from corva.models.merge.raw import RawPartialRerunMergeEvent +from corva.models.scheduled.raw import RawScheduledEvent +from corva.models.stream.raw import RawStreamEvent +from corva.models.task import RawTaskEvent + + +def find_leaf_subclasses(base_class): + leaf_classes = [] + for subclass in base_class.__subclasses__(): + if not subclass.__subclasses__(): + leaf_classes.append(subclass) + else: + leaf_classes.extend(find_leaf_subclasses(subclass)) + return leaf_classes + + +def guess_event_type(aws_event: Any) -> Optional[Type[RawBaseEvent]]: + all_children = find_leaf_subclasses(RawBaseEvent) + for child_cls in all_children: + with contextlib.suppress(pydantic.ValidationError): + child_cls.from_raw_event(aws_event) + return child_cls + + +def validate_manifested_type(manifest: Dict[str, Any], app_decorator_type: str) -> None: + manifest_app_type = manifest.get("application", {}).get("type") + if manifest_app_type and manifest_app_type != app_decorator_type: + raise RuntimeError(f'Application with type "{manifest_app_type}" ' + f'can\'t invoke a "{app_decorator_type}" handler') + + +def validate_event_payload(aws_event, app_decorator_type) -> None: + base_event_cls_to_app_type_mapping: Dict[str, Type[RawBaseEvent]] = { + "task": RawTaskEvent, + "stream": RawStreamEvent, + "scheduled": RawScheduledEvent + } + + if event_cls := guess_event_type(aws_event): + if issubclass(event_cls, RawPartialRerunMergeEvent): + # RawPartialRerunMergeEvent(-s) should be ignored here since it is not new app type itself, it's just a + # run mode for existing app types + return + + if not issubclass(event_cls, base_event_cls_to_app_type_mapping.get(app_decorator_type)): + raise RuntimeError( + f'Application with type "{app_decorator_type}" ' + f'was invoked with "{event_cls}" event type' + ) + else: + raise RuntimeError( + f'Application with type "{app_decorator_type}" ' + 'was invoked with "unknown" event type' + ) + + +@functools.lru_cache(maxsize=None) +def read_manifest() -> Optional[Dict[str, Any]]: + manifest_json_path = os.path.join(os.getcwd(), "manifest.json") + if os.path.exists(manifest_json_path): + with open(manifest_json_path, "r") as manifest_json_file: + return json.load(manifest_json_file) + + +def validate_app_type_context(aws_event: Any, app_decorator_type: str) -> None: + if manifest := read_manifest(): + validate_manifested_type(manifest, app_decorator_type) + else: + validate_event_payload(aws_event, app_decorator_type) diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py new file mode 100644 index 00000000..23afe73b --- /dev/null +++ b/tests/unit/test_apps_event_types.py @@ -0,0 +1,114 @@ +from unittest import mock + +import pytest + +from corva import StreamDepthEvent, StreamDepthRecord, StreamTimeEvent, StreamTimeRecord +from corva.handlers import scheduled, stream, task +from corva.models.scheduled.raw import ( + RawScheduledDepthEvent, + RawScheduledNaturalTimeEvent, +) +from corva.models.scheduled.scheduler_type import SchedulerType +from corva.models.task import RawTaskEvent + +raw_scheduled_natural_time_event = RawScheduledNaturalTimeEvent( + asset_id=1, + interval=1, + schedule=1, + app_connection=1, + app_stream=1, + company=1, + scheduler_type=SchedulerType.natural_time, + schedule_start=1, +) + +raw_scheduled_depth_event = RawScheduledDepthEvent( + asset_id=1, + depth_milestone=1.0, + schedule=1, + app_connection=1, + app_stream=1, + company=1, + scheduler_type=SchedulerType.data_depth_milestone, + top_depth=0.0, + bottom_depth=1.0, + log_identifier='', +) + +stream_time_event = StreamTimeEvent( + asset_id=0, company_id=0, records=[StreamTimeRecord(timestamp=0)] +) + +stream_depth_event = StreamDepthEvent( + asset_id=0, company_id=0, records=[StreamDepthRecord(measured_depth=0)] +) + +task_event = RawTaskEvent(task_id='0', version=2) + + +@pytest.mark.parametrize( + 'app_decorator,event_payload', + ( + (task, raw_scheduled_natural_time_event), + (task, raw_scheduled_depth_event), + (task, stream_time_event), + (task, stream_depth_event), + (scheduled, task_event), + (scheduled, stream_time_event), + (scheduled, stream_depth_event), + (stream, task_event), + (stream, raw_scheduled_natural_time_event), + (stream, raw_scheduled_depth_event), + ), +) +def test__lambda_call_with_mismatched_event_type__raise_error( + app_decorator, + event_payload, + context, +): + def lambda_handler(_event, _api): + ... + + particular_app = app_decorator(lambda_handler) + + with pytest.raises(RuntimeError): + particular_app( + event_payload.dict(), + context, + ) + + +@pytest.mark.parametrize( + 'app_decorator,manifested_app_type', + ( + (task, "stream"), + (task, "scheduled"), + (scheduled, "task"), + (scheduled, "stream"), + (stream, "task"), + (stream, "scheduled"), + ), +) +def test__lambda_with_mismatched_manifested_type__raise_error( + app_decorator, + manifested_app_type, + context, +): + def lambda_handler(_event, _api): + ... + + particular_app = app_decorator(lambda_handler) + + mocked_manifest = { + "application": + { + "type": manifested_app_type + } + } + + with mock.patch("corva.handlers.read_manifest", return_value=mocked_manifest): + with pytest.raises(RuntimeError): + particular_app( + {}, + context, + ) diff --git a/tests/unit/test_partial_rerun_merge_app.py b/tests/unit/test_partial_rerun_merge_app.py index 615a666b..d9c7e6d8 100644 --- a/tests/unit/test_partial_rerun_merge_app.py +++ b/tests/unit/test_partial_rerun_merge_app.py @@ -49,9 +49,10 @@ def partial_rerun_merge_app(event, api, asset_cache, rerun_asset_cache): raw_event = dict(RAW_EVENT) raw_event["event_type"] = "unknown_event_type" - with pytest.raises(ValidationError) as e: + + with pytest.raises(RuntimeError) as e: stream_app(raw_event, context) - assert "validation error" in str(e.value) + assert 'Application with type "stream" was invoked with "unknown" event type' in str(e.value) def test_merge_event_handler_called_from_stream_app_returns_expected_cache_values( From c216d325c3cf9d7052073d5ec8c56d3ec129bf12 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Sat, 25 Jan 2025 14:38:09 +0400 Subject: [PATCH 02/25] fix patch call for read_manifest func --- tests/unit/test_apps_event_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index 23afe73b..180614b2 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -106,7 +106,7 @@ def lambda_handler(_event, _api): } } - with mock.patch("corva.handlers.read_manifest", return_value=mocked_manifest): + with mock.patch("corva.validate_app_init.read_manifest", return_value=mocked_manifest): with pytest.raises(RuntimeError): particular_app( {}, From fa5e7a5ea81bc2e38444b72ca6b178415d91707a Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Sat, 25 Jan 2025 14:38:59 +0400 Subject: [PATCH 03/25] fix lint --- src/corva/__init__.py | 20 +------------------- src/corva/validate_app_init.py | 14 +++++++++----- tests/unit/test_apps_event_types.py | 13 +++++-------- tests/unit/test_partial_rerun_merge_app.py | 6 ++++-- 4 files changed, 19 insertions(+), 34 deletions(-) diff --git a/src/corva/__init__.py b/src/corva/__init__.py index b453c4f6..6e345f6a 100644 --- a/src/corva/__init__.py +++ b/src/corva/__init__.py @@ -1,22 +1,4 @@ -from .api import Api -from .handlers import scheduled, stream, task, partial_rerun_merge -from .logger import CORVA_LOGGER as Logger -from .models.rerun import RerunDepth, RerunDepthRange, RerunTime, RerunTimeRange -from .models.scheduled.scheduled import ( - ScheduledDataTimeEvent, - ScheduledDepthEvent, - ScheduledNaturalTimeEvent, -) -from .models.stream.stream import ( - StreamDepthEvent, - StreamDepthRecord, - StreamTimeEvent, - StreamTimeRecord, -) -from .models.merge.merge import PartialRerunMergeEvent -from .models.task import TaskEvent -from .service.cache_sdk import UserRedisSdk as Cache -from .shared import SECRETS as secrets +from .models.scheduled.scheduled import ScheduledDataTimeEvent def __getattr__(name): diff --git a/src/corva/validate_app_init.py b/src/corva/validate_app_init.py index 006d01e9..59ce8bc2 100644 --- a/src/corva/validate_app_init.py +++ b/src/corva/validate_app_init.py @@ -2,7 +2,7 @@ import functools import json import os -from typing import Any, Optional, Type, Dict +from typing import Any, Dict, Optional, Type import pydantic @@ -34,15 +34,17 @@ def guess_event_type(aws_event: Any) -> Optional[Type[RawBaseEvent]]: def validate_manifested_type(manifest: Dict[str, Any], app_decorator_type: str) -> None: manifest_app_type = manifest.get("application", {}).get("type") if manifest_app_type and manifest_app_type != app_decorator_type: - raise RuntimeError(f'Application with type "{manifest_app_type}" ' - f'can\'t invoke a "{app_decorator_type}" handler') + raise RuntimeError( + f'Application with type "{manifest_app_type}" ' + f'can\'t invoke a "{app_decorator_type}" handler' + ) def validate_event_payload(aws_event, app_decorator_type) -> None: base_event_cls_to_app_type_mapping: Dict[str, Type[RawBaseEvent]] = { "task": RawTaskEvent, "stream": RawStreamEvent, - "scheduled": RawScheduledEvent + "scheduled": RawScheduledEvent, } if event_cls := guess_event_type(aws_event): @@ -51,7 +53,9 @@ def validate_event_payload(aws_event, app_decorator_type) -> None: # run mode for existing app types return - if not issubclass(event_cls, base_event_cls_to_app_type_mapping.get(app_decorator_type)): + if not issubclass( + event_cls, base_event_cls_to_app_type_mapping.get(app_decorator_type) + ): raise RuntimeError( f'Application with type "{app_decorator_type}" ' f'was invoked with "{event_cls}" event type' diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index 180614b2..aec270bd 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -99,14 +99,11 @@ def lambda_handler(_event, _api): particular_app = app_decorator(lambda_handler) - mocked_manifest = { - "application": - { - "type": manifested_app_type - } - } - - with mock.patch("corva.validate_app_init.read_manifest", return_value=mocked_manifest): + mocked_manifest = {"application": {"type": manifested_app_type}} + + with mock.patch( + "corva.validate_app_init.read_manifest", return_value=mocked_manifest + ): with pytest.raises(RuntimeError): particular_app( {}, diff --git a/tests/unit/test_partial_rerun_merge_app.py b/tests/unit/test_partial_rerun_merge_app.py index d9c7e6d8..9282caf7 100644 --- a/tests/unit/test_partial_rerun_merge_app.py +++ b/tests/unit/test_partial_rerun_merge_app.py @@ -3,7 +3,6 @@ from uuid import uuid4 import pytest -from pydantic import ValidationError import corva @@ -52,7 +51,10 @@ def partial_rerun_merge_app(event, api, asset_cache, rerun_asset_cache): with pytest.raises(RuntimeError) as e: stream_app(raw_event, context) - assert 'Application with type "stream" was invoked with "unknown" event type' in str(e.value) + assert ( + 'Application with type "stream" was invoked with "unknown" event type' + in str(e.value) + ) def test_merge_event_handler_called_from_stream_app_returns_expected_cache_values( From 4404c624043188df9a4f3c79c512ffc75591c565 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Sat, 25 Jan 2025 14:44:58 +0400 Subject: [PATCH 04/25] fix type checkers --- src/corva/validate_app_init.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/corva/validate_app_init.py b/src/corva/validate_app_init.py index 59ce8bc2..d3f30fe6 100644 --- a/src/corva/validate_app_init.py +++ b/src/corva/validate_app_init.py @@ -29,6 +29,7 @@ def guess_event_type(aws_event: Any) -> Optional[Type[RawBaseEvent]]: with contextlib.suppress(pydantic.ValidationError): child_cls.from_raw_event(aws_event) return child_cls + return None def validate_manifested_type(manifest: Dict[str, Any], app_decorator_type: str) -> None: @@ -49,13 +50,12 @@ def validate_event_payload(aws_event, app_decorator_type) -> None: if event_cls := guess_event_type(aws_event): if issubclass(event_cls, RawPartialRerunMergeEvent): - # RawPartialRerunMergeEvent(-s) should be ignored here since it is not new app type itself, it's just a - # run mode for existing app types + # RawPartialRerunMergeEvent(-s) should be ignored here since + # it is not new app type itself it's just a run mode for existing app types return - if not issubclass( - event_cls, base_event_cls_to_app_type_mapping.get(app_decorator_type) - ): + expected_base_event_cls = base_event_cls_to_app_type_mapping[app_decorator_type] + if not issubclass(event_cls, expected_base_event_cls): raise RuntimeError( f'Application with type "{app_decorator_type}" ' f'was invoked with "{event_cls}" event type' @@ -73,6 +73,7 @@ def read_manifest() -> Optional[Dict[str, Any]]: if os.path.exists(manifest_json_path): with open(manifest_json_path, "r") as manifest_json_file: return json.load(manifest_json_file) + return None def validate_app_type_context(aws_event: Any, app_decorator_type: str) -> None: From 21de5cb6514e58c16f633ebdb2efa9dc1a5f3a15 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Sat, 25 Jan 2025 14:48:02 +0400 Subject: [PATCH 05/25] fix tests for test_apps_event_types.py --- tests/unit/test_apps_event_types.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index aec270bd..30753b6a 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -2,13 +2,18 @@ import pytest -from corva import StreamDepthEvent, StreamDepthRecord, StreamTimeEvent, StreamTimeRecord from corva.handlers import scheduled, stream, task from corva.models.scheduled.raw import ( RawScheduledDepthEvent, RawScheduledNaturalTimeEvent, ) from corva.models.scheduled.scheduler_type import SchedulerType +from corva.models.stream.stream import ( + StreamDepthEvent, + StreamDepthRecord, + StreamTimeEvent, + StreamTimeRecord, +) from corva.models.task import RawTaskEvent raw_scheduled_natural_time_event = RawScheduledNaturalTimeEvent( From 7ee1ddb5fed608f013837e0e0e99833aebbe0c8e Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Sun, 26 Jan 2025 18:53:27 +0400 Subject: [PATCH 06/25] revert some changes; optimize usage for BASE_EVENT_CLS_TO_APP_TYPE_MAPPING --- src/corva/__init__.py | 20 +++++++++++++++++++- src/corva/validate_app_init.py | 13 +++++++------ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/corva/__init__.py b/src/corva/__init__.py index 6e345f6a..b453c4f6 100644 --- a/src/corva/__init__.py +++ b/src/corva/__init__.py @@ -1,4 +1,22 @@ -from .models.scheduled.scheduled import ScheduledDataTimeEvent +from .api import Api +from .handlers import scheduled, stream, task, partial_rerun_merge +from .logger import CORVA_LOGGER as Logger +from .models.rerun import RerunDepth, RerunDepthRange, RerunTime, RerunTimeRange +from .models.scheduled.scheduled import ( + ScheduledDataTimeEvent, + ScheduledDepthEvent, + ScheduledNaturalTimeEvent, +) +from .models.stream.stream import ( + StreamDepthEvent, + StreamDepthRecord, + StreamTimeEvent, + StreamTimeRecord, +) +from .models.merge.merge import PartialRerunMergeEvent +from .models.task import TaskEvent +from .service.cache_sdk import UserRedisSdk as Cache +from .shared import SECRETS as secrets def __getattr__(name): diff --git a/src/corva/validate_app_init.py b/src/corva/validate_app_init.py index d3f30fe6..238dd172 100644 --- a/src/corva/validate_app_init.py +++ b/src/corva/validate_app_init.py @@ -12,6 +12,12 @@ from corva.models.stream.raw import RawStreamEvent from corva.models.task import RawTaskEvent +BASE_EVENT_CLS_TO_APP_TYPE_MAPPING: Dict[str, Type[RawBaseEvent]] = { + "task": RawTaskEvent, + "stream": RawStreamEvent, + "scheduled": RawScheduledEvent, +} + def find_leaf_subclasses(base_class): leaf_classes = [] @@ -42,11 +48,6 @@ def validate_manifested_type(manifest: Dict[str, Any], app_decorator_type: str) def validate_event_payload(aws_event, app_decorator_type) -> None: - base_event_cls_to_app_type_mapping: Dict[str, Type[RawBaseEvent]] = { - "task": RawTaskEvent, - "stream": RawStreamEvent, - "scheduled": RawScheduledEvent, - } if event_cls := guess_event_type(aws_event): if issubclass(event_cls, RawPartialRerunMergeEvent): @@ -54,7 +55,7 @@ def validate_event_payload(aws_event, app_decorator_type) -> None: # it is not new app type itself it's just a run mode for existing app types return - expected_base_event_cls = base_event_cls_to_app_type_mapping[app_decorator_type] + expected_base_event_cls = BASE_EVENT_CLS_TO_APP_TYPE_MAPPING[app_decorator_type] if not issubclass(event_cls, expected_base_event_cls): raise RuntimeError( f'Application with type "{app_decorator_type}" ' From b7bb30405a07735954312ec01d6e872856e19fb9 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Mon, 27 Jan 2025 14:46:46 +0400 Subject: [PATCH 07/25] add more precise tests for new functionality --- tests/unit/conftest.py | 8 ++++++ tests/unit/test_apps_event_types.py | 43 +++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 71461bb8..7ebfaaa6 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -3,6 +3,7 @@ from corva.configuration import SETTINGS from corva.testing import TestClient +from corva.validate_app_init import read_manifest @pytest.fixture(scope='function', autouse=True) @@ -16,6 +17,13 @@ def clean_redis(): redis_client.flushall() +@pytest.fixture(scope='function', autouse=True) +def clean_read_manifest_lru_cache(): + read_manifest.cache_clear() + yield + read_manifest.cache_clear() + + @pytest.fixture(scope='function') def context(): return TestClient._context diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index 30753b6a..e97c6c28 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -1,3 +1,4 @@ +import json from unittest import mock import pytest @@ -15,6 +16,7 @@ StreamTimeRecord, ) from corva.models.task import RawTaskEvent +from corva.validate_app_init import read_manifest, validate_app_type_context raw_scheduled_natural_time_event = RawScheduledNaturalTimeEvent( asset_id=1, @@ -114,3 +116,44 @@ def lambda_handler(_event, _api): {}, context, ) + + +def test__validate_app_type_with_missed_app_type_at_manifested__does_not_fails(context): + mocked_manifest = {"application": {"type": None}} + + with mock.patch( + "corva.validate_app_init.read_manifest", return_value=mocked_manifest + ): + validate_app_type_context(aws_event="mocked", app_decorator_type="mocked") + + +def test__read_correct_manifest_file__success(context): + """Test when manifest.json exists and contains valid JSON.""" + manifest_payload = {"application": {"type": "stream"}} + manifest_payload_json = json.dumps(manifest_payload) + + with ( + mock.patch("os.path.exists", return_value=True), + mock.patch("builtins.open", mock.mock_open(read_data=manifest_payload_json)), + ): + result = read_manifest() + assert result == manifest_payload + + +def test__read_invalid_json_manifest_file__error(context): + """Test when manifest.json contains invalid JSON.""" + manifest_invalid_json = "{invalid: json}" + + with ( + mock.patch("os.path.exists", return_value=True), + mock.patch("builtins.open", mock.mock_open(read_data=manifest_invalid_json)), + ): + with pytest.raises(json.JSONDecodeError): + read_manifest() + + +def test__manifest_file_not_exists__success(context): + """Test when manifest.json contains invalid JSON.""" + with mock.patch("os.path.exists", return_value=False): + result = read_manifest() + assert result is None From 84a68a45a04331c9317190bafef9f914fba82aef Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Mon, 27 Jan 2025 14:55:02 +0400 Subject: [PATCH 08/25] add matrix testing on different py versions; fix error with context manager for multiple with blocks --- .github/workflows/main.yml | 11 +++++++---- tests/unit/test_apps_event_types.py | 20 ++++++++------------ 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 5c886412..edb7690e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -22,16 +22,19 @@ jobs: run: make lint test: - name: Automated testing - + name: Automated Testing on Python ${{ matrix.python-version }} runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ 3.8, 3.9, 3.10, 3.11 ] # Define Python versions here steps: - uses: actions/checkout@v2 - - uses: actions/setup-python@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: ${{ matrix.python-version }} - name: Install run: make install-test diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index e97c6c28..2e6e7017 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -132,24 +132,20 @@ def test__read_correct_manifest_file__success(context): manifest_payload = {"application": {"type": "stream"}} manifest_payload_json = json.dumps(manifest_payload) - with ( - mock.patch("os.path.exists", return_value=True), - mock.patch("builtins.open", mock.mock_open(read_data=manifest_payload_json)), - ): - result = read_manifest() - assert result == manifest_payload + with mock.patch("os.path.exists", return_value=True): + with mock.patch("builtins.open", mock.mock_open(read_data=manifest_payload_json)): + result = read_manifest() + assert result == manifest_payload def test__read_invalid_json_manifest_file__error(context): """Test when manifest.json contains invalid JSON.""" manifest_invalid_json = "{invalid: json}" - with ( - mock.patch("os.path.exists", return_value=True), - mock.patch("builtins.open", mock.mock_open(read_data=manifest_invalid_json)), - ): - with pytest.raises(json.JSONDecodeError): - read_manifest() + with mock.patch("os.path.exists", return_value=True): + with mock.patch("builtins.open", mock.mock_open(read_data=manifest_invalid_json)): + with pytest.raises(json.JSONDecodeError): + read_manifest() def test__manifest_file_not_exists__success(context): From 473a152092d541c376658486b1852affdf799db4 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Mon, 27 Jan 2025 14:56:49 +0400 Subject: [PATCH 09/25] fix workflow --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index edb7690e..7451a96f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [ 3.8, 3.9, 3.10, 3.11 ] # Define Python versions here + python-version: [3.8, 3.9, 3.10, 3.11] # Define Python versions here steps: - uses: actions/checkout@v2 From 837556d9e3c7babb908bbf1a210e6ea9dc6e9f15 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Mon, 27 Jan 2025 14:58:25 +0400 Subject: [PATCH 10/25] fix workflow --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 7451a96f..1d52741d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -26,7 +26,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, 3.10, 3.11] # Define Python versions here + python-version: ["3.8", "3.9", "3.10", "3.11"] steps: - uses: actions/checkout@v2 From c099de77e11f99d453b8cb77e41a2721f26c4c80 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Mon, 27 Jan 2025 15:06:26 +0400 Subject: [PATCH 11/25] trying to fix pytest errors on newer version 6.2.5 for tests on py 3.10 & 3.11 --- requirements-test.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements-test.txt b/requirements-test.txt index 5b6e9aa7..ce3150d9 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,5 @@ coverage==5.3 freezegun==1.0.0 -pytest==6.1.2 +pytest==6.2.5 pytest-mock==3.3.1 requests-mock==1.8.0 From c13c6a9ac3dcac682f28acda3594cf42e90bfdd7 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Mon, 27 Jan 2025 15:15:12 +0400 Subject: [PATCH 12/25] fix linting --- tests/unit/test_apps_event_types.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index 2e6e7017..d3e63d52 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -133,7 +133,9 @@ def test__read_correct_manifest_file__success(context): manifest_payload_json = json.dumps(manifest_payload) with mock.patch("os.path.exists", return_value=True): - with mock.patch("builtins.open", mock.mock_open(read_data=manifest_payload_json)): + with mock.patch( + "builtins.open", mock.mock_open(read_data=manifest_payload_json) + ): result = read_manifest() assert result == manifest_payload @@ -143,7 +145,9 @@ def test__read_invalid_json_manifest_file__error(context): manifest_invalid_json = "{invalid: json}" with mock.patch("os.path.exists", return_value=True): - with mock.patch("builtins.open", mock.mock_open(read_data=manifest_invalid_json)): + with mock.patch( + "builtins.open", mock.mock_open(read_data=manifest_invalid_json) + ): with pytest.raises(json.JSONDecodeError): read_manifest() From 5f6b2486c44cf4171541d2929dea3ca528cf2ea0 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Mon, 27 Jan 2025 15:18:20 +0400 Subject: [PATCH 13/25] small test improvement --- tests/unit/test_apps_event_types.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index d3e63d52..0598055d 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -73,13 +73,12 @@ def test__lambda_call_with_mismatched_event_type__raise_error( event_payload, context, ): + @app_decorator def lambda_handler(_event, _api): ... - particular_app = app_decorator(lambda_handler) - with pytest.raises(RuntimeError): - particular_app( + lambda_handler( event_payload.dict(), context, ) @@ -101,18 +100,17 @@ def test__lambda_with_mismatched_manifested_type__raise_error( manifested_app_type, context, ): + @app_decorator def lambda_handler(_event, _api): ... - particular_app = app_decorator(lambda_handler) - mocked_manifest = {"application": {"type": manifested_app_type}} with mock.patch( "corva.validate_app_init.read_manifest", return_value=mocked_manifest ): with pytest.raises(RuntimeError): - particular_app( + lambda_handler( {}, context, ) From 463950cc071902ed219837fca7d35b4ffc331049 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Mon, 27 Jan 2025 15:20:36 +0400 Subject: [PATCH 14/25] ignore errors at matrix job running --- .github/workflows/main.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 1d52741d..0ca802ac 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -26,6 +26,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: + fail-fast: false python-version: ["3.8", "3.9", "3.10", "3.11"] steps: From 49837f7ab416de3eacf6cd38d2f7819866780848 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Mon, 27 Jan 2025 15:26:00 +0400 Subject: [PATCH 15/25] some tests using continue-on-error=true --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 0ca802ac..871c895b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -24,9 +24,9 @@ jobs: test: name: Automated Testing on Python ${{ matrix.python-version }} runs-on: ubuntu-latest + continue-on-error: true strategy: matrix: - fail-fast: false python-version: ["3.8", "3.9", "3.10", "3.11"] steps: From 12f174e6c2d5379af752b325e43d00363f545e09 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Mon, 27 Jan 2025 15:36:37 +0400 Subject: [PATCH 16/25] revert temp changes --- .github/workflows/main.yml | 12 ++++-------- requirements-test.txt | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 871c895b..5c886412 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -22,20 +22,16 @@ jobs: run: make lint test: - name: Automated Testing on Python ${{ matrix.python-version }} + name: Automated testing + runs-on: ubuntu-latest - continue-on-error: true - strategy: - matrix: - python-version: ["3.8", "3.9", "3.10", "3.11"] steps: - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v2 + - uses: actions/setup-python@v2 with: - python-version: ${{ matrix.python-version }} + python-version: 3.8 - name: Install run: make install-test diff --git a/requirements-test.txt b/requirements-test.txt index ce3150d9..5b6e9aa7 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,5 @@ coverage==5.3 freezegun==1.0.0 -pytest==6.2.5 +pytest==6.1.2 pytest-mock==3.3.1 requests-mock==1.8.0 From c2c25dae519c58724369f8bf9391845869e4ce0d Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Mon, 27 Jan 2025 15:53:12 +0400 Subject: [PATCH 17/25] bump new minor version for corva python-sdk: update CHANGELOG.md & antora playbook --- CHANGELOG.md | 6 ++++++ docs/antora-playbook.yml | 2 +- src/version.py | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec1e0bd4..8164c9f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.12.1] - 2025-01-27 +### Fixed +- Add app logic for meaningful error when mismatched app type is used either declared at `manifest.json` or according + to used event payload passed to app + + ## [1.12.0] - 2024-10-06 ### Fixed - Unset the `CORVA_LOGGER.propagate = False`, so the OTel handler will be able to collect and send those logs as well diff --git a/docs/antora-playbook.yml b/docs/antora-playbook.yml index 9297d175..8064f2b3 100644 --- a/docs/antora-playbook.yml +++ b/docs/antora-playbook.yml @@ -7,7 +7,7 @@ content: start_path: docs branches: [] # branches: HEAD # Use this for local development - tags: [v1.12.0] + tags: [v1.12.1] asciidoc: attributes: page-toclevels: 5 diff --git a/src/version.py b/src/version.py index 5878cbea..48028b69 100644 --- a/src/version.py +++ b/src/version.py @@ -1 +1 @@ -VERSION = "1.12.0" +VERSION = "1.12.1" From c6c541f432c86bf5cce1eadbade127f6a41a0348 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Tue, 28 Jan 2025 19:44:40 +0400 Subject: [PATCH 18/25] feat: update SDK logic to be more transparent and optimal --- src/corva/handlers.py | 7 +---- src/corva/models/base.py | 14 ++++++++- src/corva/models/scheduled/raw.py | 6 +++- src/corva/models/stream/raw.py | 6 +++- src/corva/models/task.py | 6 +++- src/corva/validate_app_init.py | 45 +++++++++++++-------------- tests/unit/test_apps_event_types.py | 48 ++++++++++++++++++++++++++--- 7 files changed, 95 insertions(+), 37 deletions(-) diff --git a/src/corva/handlers.py b/src/corva/handlers.py index 66a8701b..8cf5d889 100644 --- a/src/corva/handlers.py +++ b/src/corva/handlers.py @@ -61,7 +61,6 @@ def base_handler( func: Callable, raw_event_type: Type[RawBaseEvent], handler: Optional[logging.Handler], - app_decorator_type: str, merge_events: bool = False, ) -> Callable[[Any, Any], List[Any]]: @functools.wraps(func) @@ -79,7 +78,6 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: custom_handler, ) = _get_custom_event_type_by_raw_aws_event(aws_event) specific_callable = custom_handler or func - try: context = CorvaContext.from_aws( aws_event=aws_event, aws_context=aws_context @@ -92,7 +90,7 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: if merge_events: aws_event = _merge_events(aws_event, data_transformation_type) - validate_app_type_context(aws_event, app_decorator_type) + validate_app_type_context(aws_event, raw_event_type) raw_events = data_transformation_type.from_raw_event(event=aws_event) if ( @@ -148,7 +146,6 @@ def stream( raw_event_type=RawStreamEvent, handler=handler, merge_events=merge_events, - app_decorator_type="stream", ) def wrapper( event: RawStreamEvent, @@ -258,7 +255,6 @@ def scheduled( raw_event_type=RawScheduledEvent, handler=handler, merge_events=merge_events, - app_decorator_type="scheduled", ) def wrapper( event: RawScheduledEvent, @@ -364,7 +360,6 @@ def task( base_handler, raw_event_type=RawTaskEvent, handler=handler, - app_decorator_type="task", ) def wrapper( event: RawTaskEvent, diff --git a/src/corva/models/base.py b/src/corva/models/base.py index 6b94e5da..ff316997 100644 --- a/src/corva/models/base.py +++ b/src/corva/models/base.py @@ -1,11 +1,18 @@ from __future__ import annotations import abc -from typing import Any, Sequence +from enum import Enum +from typing import Any, Sequence, Optional import pydantic +class AppType(str, Enum): + STREAM = "stream" + TASK = "task" + SCHEDULED = "scheduled" + + class CorvaBaseEvent(pydantic.BaseModel): class Config: extra = pydantic.Extra.allow @@ -13,7 +20,12 @@ class Config: class RawBaseEvent(abc.ABC): + @staticmethod @abc.abstractmethod def from_raw_event(event: Any) -> Sequence[RawBaseEvent]: pass + + @classmethod + def get_app_type(cls) -> Optional[AppType]: + return None diff --git a/src/corva/models/scheduled/raw.py b/src/corva/models/scheduled/raw.py index cc57b42a..b960b256 100644 --- a/src/corva/models/scheduled/raw.py +++ b/src/corva/models/scheduled/raw.py @@ -7,7 +7,7 @@ from corva.api import Api from corva.models import validators -from corva.models.base import CorvaBaseEvent, RawBaseEvent +from corva.models.base import CorvaBaseEvent, RawBaseEvent, AppType from corva.models.rerun import RerunDepth, RerunTime from corva.models.scheduled.scheduler_type import SchedulerType @@ -32,6 +32,10 @@ class RawScheduledEvent(CorvaBaseEvent, RawBaseEvent): scheduler_type: SchedulerType has_secrets: bool = False + @classmethod + def get_app_type(cls) -> AppType: + return AppType.SCHEDULED + @staticmethod def from_raw_event(event: Union[dict, List[List[dict]]]) -> List[RawScheduledEvent]: if isinstance(event, dict): diff --git a/src/corva/models/stream/raw.py b/src/corva/models/stream/raw.py index cf47be91..bc1206fd 100644 --- a/src/corva/models/stream/raw.py +++ b/src/corva/models/stream/raw.py @@ -7,7 +7,7 @@ import pydantic from corva.configuration import SETTINGS -from corva.models.base import CorvaBaseEvent, RawBaseEvent +from corva.models.base import CorvaBaseEvent, RawBaseEvent, AppType from corva.models.rerun import RerunDepth, RerunTime from corva.models.stream.initial import InitialStreamEvent from corva.models.stream.log_type import LogType @@ -86,6 +86,10 @@ class RawStreamEvent(CorvaBaseEvent, RawBaseEvent): # private attributes _max_record_value_cache_key: ClassVar[str] + @classmethod + def get_app_type(cls) -> AppType: + return AppType.STREAM + @property def app_connection_id(self) -> int: return getattr(self.metadata.apps, SETTINGS.APP_KEY).app_connection_id diff --git a/src/corva/models/task.py b/src/corva/models/task.py index 019e3031..a7ad5438 100644 --- a/src/corva/models/task.py +++ b/src/corva/models/task.py @@ -7,7 +7,7 @@ import requests from corva.api import Api -from corva.models.base import CorvaBaseEvent, RawBaseEvent +from corva.models.base import CorvaBaseEvent, RawBaseEvent, AppType class TaskStatus(enum.Enum): @@ -34,6 +34,10 @@ class RawTaskEvent(CorvaBaseEvent, RawBaseEvent): has_secrets: bool = False version: int = pydantic.Field(..., le=2, ge=2) # only utils API v2 supported + @classmethod + def get_app_type(cls) -> AppType: + return AppType.TASK + @staticmethod def from_raw_event(event: dict) -> List[RawTaskEvent]: return [pydantic.parse_obj_as(RawTaskEvent, event)] diff --git a/src/corva/validate_app_init.py b/src/corva/validate_app_init.py index 238dd172..0f73cd63 100644 --- a/src/corva/validate_app_init.py +++ b/src/corva/validate_app_init.py @@ -6,17 +6,8 @@ import pydantic -from corva.models.base import RawBaseEvent +from corva.models.base import AppType, RawBaseEvent from corva.models.merge.raw import RawPartialRerunMergeEvent -from corva.models.scheduled.raw import RawScheduledEvent -from corva.models.stream.raw import RawStreamEvent -from corva.models.task import RawTaskEvent - -BASE_EVENT_CLS_TO_APP_TYPE_MAPPING: Dict[str, Type[RawBaseEvent]] = { - "task": RawTaskEvent, - "stream": RawStreamEvent, - "scheduled": RawScheduledEvent, -} def find_leaf_subclasses(base_class): @@ -38,16 +29,18 @@ def guess_event_type(aws_event: Any) -> Optional[Type[RawBaseEvent]]: return None -def validate_manifested_type(manifest: Dict[str, Any], app_decorator_type: str) -> None: - manifest_app_type = manifest.get("application", {}).get("type") - if manifest_app_type and manifest_app_type != app_decorator_type: +def validate_manifested_type( + manifest_app_type: AppType, + raw_event_type: Type[RawBaseEvent] +) -> None: + if manifest_app_type != raw_event_type.get_app_type(): raise RuntimeError( - f'Application with type "{manifest_app_type}" ' - f'can\'t invoke a "{app_decorator_type}" handler' + f'Application with type "{manifest_app_type.value}" ' + f'can\'t invoke a "{raw_event_type.get_app_type().value}" handler' ) -def validate_event_payload(aws_event, app_decorator_type) -> None: +def validate_event_payload(aws_event: Any, raw_event_type: Type[RawBaseEvent]) -> None: if event_cls := guess_event_type(aws_event): if issubclass(event_cls, RawPartialRerunMergeEvent): @@ -55,15 +48,14 @@ def validate_event_payload(aws_event, app_decorator_type) -> None: # it is not new app type itself it's just a run mode for existing app types return - expected_base_event_cls = BASE_EVENT_CLS_TO_APP_TYPE_MAPPING[app_decorator_type] - if not issubclass(event_cls, expected_base_event_cls): + if not issubclass(event_cls, raw_event_type): raise RuntimeError( - f'Application with type "{app_decorator_type}" ' + f'Application with type "{raw_event_type.get_app_type().value}" ' f'was invoked with "{event_cls}" event type' ) else: raise RuntimeError( - f'Application with type "{app_decorator_type}" ' + f'Application with type "{raw_event_type.get_app_type().value}" ' 'was invoked with "unknown" event type' ) @@ -77,8 +69,15 @@ def read_manifest() -> Optional[Dict[str, Any]]: return None -def validate_app_type_context(aws_event: Any, app_decorator_type: str) -> None: +def get_manifested_type() -> Optional[AppType]: if manifest := read_manifest(): - validate_manifested_type(manifest, app_decorator_type) + application_type = manifest.get("application", {}).get("type") + return AppType(application_type) if application_type else None + return None + + +def validate_app_type_context(aws_event: Any, raw_event_type: Type[RawBaseEvent]) -> None: + if manifested_type := get_manifested_type(): + validate_manifested_type(manifested_type, raw_event_type) else: - validate_event_payload(aws_event, app_decorator_type) + validate_event_payload(aws_event, raw_event_type) diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index 0598055d..5fc0c126 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -6,9 +6,10 @@ from corva.handlers import scheduled, stream, task from corva.models.scheduled.raw import ( RawScheduledDepthEvent, - RawScheduledNaturalTimeEvent, + RawScheduledNaturalTimeEvent, RawScheduledEvent, ) from corva.models.scheduled.scheduler_type import SchedulerType +from corva.models.stream.raw import RawStreamEvent from corva.models.stream.stream import ( StreamDepthEvent, StreamDepthRecord, @@ -16,7 +17,7 @@ StreamTimeRecord, ) from corva.models.task import RawTaskEvent -from corva.validate_app_init import read_manifest, validate_app_type_context +from corva.validate_app_init import read_manifest, validate_app_type_context, validate_event_payload raw_scheduled_natural_time_event = RawScheduledNaturalTimeEvent( asset_id=1, @@ -116,13 +117,52 @@ def lambda_handler(_event, _api): ) -def test__validate_app_type_with_missed_app_type_at_manifested__does_not_fails(context): +def test__if_manifested_app_type_is_none__payload_based_validation_called(context): mocked_manifest = {"application": {"type": None}} with mock.patch( "corva.validate_app_init.read_manifest", return_value=mocked_manifest ): - validate_app_type_context(aws_event="mocked", app_decorator_type="mocked") + with mock.patch( + "corva.validate_app_init.validate_event_payload", + wraps=validate_event_payload + ) as mocked_validate_event_payload: + validate_app_type_context( + aws_event=task_event.dict(), + raw_event_type=RawTaskEvent + ) + + mocked_validate_event_payload.assert_called_once() + + +def test__validate_app_type_with_wrong_app_type_at_manifest__raise_error(context): + mocked_manifest = {"application": {"type": "wrong_type"}} + + with mock.patch( + "corva.validate_app_init.read_manifest", return_value=mocked_manifest + ): + with pytest.raises(ValueError, match="'wrong_type' is not a valid AppType"): + validate_app_type_context(aws_event=None, raw_event_type=RawTaskEvent) + + +@pytest.mark.parametrize( + 'manifested_app_type, raw_base_event_type', + ( + ("task", RawTaskEvent), + ("stream", RawStreamEvent), + ("scheduled", RawScheduledEvent), + ), +) +def test__right_manifested_app_type_and_raw_event_type_passed__success( + manifested_app_type, + raw_base_event_type, +): + mocked_manifest = {"application": {"type": manifested_app_type}} + + with mock.patch( + "corva.validate_app_init.read_manifest", return_value=mocked_manifest + ): + validate_app_type_context(aws_event="any", raw_event_type=raw_base_event_type) def test__read_correct_manifest_file__success(context): From 306b0815f679b169c1f4d41d8454ff4b2bfc63ce Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Tue, 28 Jan 2025 19:46:04 +0400 Subject: [PATCH 19/25] feat: fix linting --- src/corva/models/base.py | 3 +-- src/corva/models/scheduled/raw.py | 2 +- src/corva/models/stream/raw.py | 2 +- src/corva/models/task.py | 2 +- src/corva/validate_app_init.py | 7 ++++--- tests/unit/test_apps_event_types.py | 16 ++++++++++------ 6 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/corva/models/base.py b/src/corva/models/base.py index ff316997..4f786565 100644 --- a/src/corva/models/base.py +++ b/src/corva/models/base.py @@ -2,7 +2,7 @@ import abc from enum import Enum -from typing import Any, Sequence, Optional +from typing import Any, Optional, Sequence import pydantic @@ -20,7 +20,6 @@ class Config: class RawBaseEvent(abc.ABC): - @staticmethod @abc.abstractmethod def from_raw_event(event: Any) -> Sequence[RawBaseEvent]: diff --git a/src/corva/models/scheduled/raw.py b/src/corva/models/scheduled/raw.py index b960b256..6c4f6c83 100644 --- a/src/corva/models/scheduled/raw.py +++ b/src/corva/models/scheduled/raw.py @@ -7,7 +7,7 @@ from corva.api import Api from corva.models import validators -from corva.models.base import CorvaBaseEvent, RawBaseEvent, AppType +from corva.models.base import AppType, CorvaBaseEvent, RawBaseEvent from corva.models.rerun import RerunDepth, RerunTime from corva.models.scheduled.scheduler_type import SchedulerType diff --git a/src/corva/models/stream/raw.py b/src/corva/models/stream/raw.py index bc1206fd..12d4c0b6 100644 --- a/src/corva/models/stream/raw.py +++ b/src/corva/models/stream/raw.py @@ -7,7 +7,7 @@ import pydantic from corva.configuration import SETTINGS -from corva.models.base import CorvaBaseEvent, RawBaseEvent, AppType +from corva.models.base import AppType, CorvaBaseEvent, RawBaseEvent from corva.models.rerun import RerunDepth, RerunTime from corva.models.stream.initial import InitialStreamEvent from corva.models.stream.log_type import LogType diff --git a/src/corva/models/task.py b/src/corva/models/task.py index a7ad5438..b7198b29 100644 --- a/src/corva/models/task.py +++ b/src/corva/models/task.py @@ -7,7 +7,7 @@ import requests from corva.api import Api -from corva.models.base import CorvaBaseEvent, RawBaseEvent, AppType +from corva.models.base import AppType, CorvaBaseEvent, RawBaseEvent class TaskStatus(enum.Enum): diff --git a/src/corva/validate_app_init.py b/src/corva/validate_app_init.py index 0f73cd63..f47e6410 100644 --- a/src/corva/validate_app_init.py +++ b/src/corva/validate_app_init.py @@ -30,8 +30,7 @@ def guess_event_type(aws_event: Any) -> Optional[Type[RawBaseEvent]]: def validate_manifested_type( - manifest_app_type: AppType, - raw_event_type: Type[RawBaseEvent] + manifest_app_type: AppType, raw_event_type: Type[RawBaseEvent] ) -> None: if manifest_app_type != raw_event_type.get_app_type(): raise RuntimeError( @@ -76,7 +75,9 @@ def get_manifested_type() -> Optional[AppType]: return None -def validate_app_type_context(aws_event: Any, raw_event_type: Type[RawBaseEvent]) -> None: +def validate_app_type_context( + aws_event: Any, raw_event_type: Type[RawBaseEvent] +) -> None: if manifested_type := get_manifested_type(): validate_manifested_type(manifested_type, raw_event_type) else: diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index 5fc0c126..628a7a36 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -6,7 +6,8 @@ from corva.handlers import scheduled, stream, task from corva.models.scheduled.raw import ( RawScheduledDepthEvent, - RawScheduledNaturalTimeEvent, RawScheduledEvent, + RawScheduledEvent, + RawScheduledNaturalTimeEvent, ) from corva.models.scheduled.scheduler_type import SchedulerType from corva.models.stream.raw import RawStreamEvent @@ -17,7 +18,11 @@ StreamTimeRecord, ) from corva.models.task import RawTaskEvent -from corva.validate_app_init import read_manifest, validate_app_type_context, validate_event_payload +from corva.validate_app_init import ( + read_manifest, + validate_app_type_context, + validate_event_payload, +) raw_scheduled_natural_time_event = RawScheduledNaturalTimeEvent( asset_id=1, @@ -124,12 +129,11 @@ def test__if_manifested_app_type_is_none__payload_based_validation_called(contex "corva.validate_app_init.read_manifest", return_value=mocked_manifest ): with mock.patch( - "corva.validate_app_init.validate_event_payload", - wraps=validate_event_payload + "corva.validate_app_init.validate_event_payload", + wraps=validate_event_payload, ) as mocked_validate_event_payload: validate_app_type_context( - aws_event=task_event.dict(), - raw_event_type=RawTaskEvent + aws_event=task_event.dict(), raw_event_type=RawTaskEvent ) mocked_validate_event_payload.assert_called_once() From 2209c201a90a4e2a5111189a11e9c5dc206edce5 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Tue, 28 Jan 2025 19:49:25 +0400 Subject: [PATCH 20/25] fix: linter --- src/corva/models/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/corva/models/base.py b/src/corva/models/base.py index 4f786565..ce2bdd5c 100644 --- a/src/corva/models/base.py +++ b/src/corva/models/base.py @@ -2,7 +2,7 @@ import abc from enum import Enum -from typing import Any, Optional, Sequence +from typing import Any, Sequence import pydantic @@ -26,5 +26,5 @@ def from_raw_event(event: Any) -> Sequence[RawBaseEvent]: pass @classmethod - def get_app_type(cls) -> Optional[AppType]: - return None + def get_app_type(cls) -> AppType: + ... From a5c970acb9aeaa571e08fdfcc9767bc315fc8bae Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Thu, 30 Jan 2025 18:54:35 +0400 Subject: [PATCH 21/25] add some fixes; code improvements --- src/corva/handlers.py | 40 +++++---- src/corva/validate_app_init.py | 12 +-- tests/unit/test_apps_event_types.py | 96 ++++++++++++++-------- tests/unit/test_partial_rerun_merge_app.py | 8 +- 4 files changed, 89 insertions(+), 67 deletions(-) diff --git a/src/corva/handlers.py b/src/corva/handlers.py index 8cf5d889..ae0fdab9 100644 --- a/src/corva/handlers.py +++ b/src/corva/handlers.py @@ -41,7 +41,7 @@ StreamEventT = TypeVar("StreamEventT", bound=StreamEvent) ScheduledEventT = TypeVar("ScheduledEventT", bound=ScheduledEvent) HANDLERS: Dict[Type[RawBaseEvent], Callable] = {} -GENERIC_APP_EVENT_TYPES = [RawStreamEvent, RawScheduledEvent, RawTaskEvent] +GENERIC_APP_EVENT_TYPES = (RawStreamEvent, RawScheduledEvent, RawTaskEvent,) def get_cache_key( @@ -73,11 +73,31 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: user_handler=handler, logger=CORVA_LOGGER, ) as logging_ctx: + # Verify either current call from app_decorator or not + # for instance from partial rerun merge ( raw_custom_event_type, custom_handler, ) = _get_custom_event_type_by_raw_aws_event(aws_event) - specific_callable = custom_handler or func + is_direct_app_call: bool = not custom_handler + data_transformation_type = raw_custom_event_type or raw_event_type + if merge_events: + aws_event = _merge_events(aws_event, data_transformation_type) + + if is_direct_app_call: + # Means current app call is not RawPartialRerunMergeEvent or similar + validate_app_type_context(aws_event, raw_event_type) + + if ( + is_direct_app_call + and data_transformation_type not in GENERIC_APP_EVENT_TYPES + ): + CORVA_LOGGER.warning( + f"Handler for {data_transformation_type.__name__!r} " + f"event not found. Skipping..." + ) + return [] + try: context = CorvaContext.from_aws( aws_event=aws_event, aws_context=aws_context @@ -86,22 +106,8 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: redis_client = redis.Redis.from_url( url=SETTINGS.CACHE_URL, decode_responses=True, max_connections=1 ) - data_transformation_type = raw_custom_event_type or raw_event_type - if merge_events: - aws_event = _merge_events(aws_event, data_transformation_type) - - validate_app_type_context(aws_event, raw_event_type) raw_events = data_transformation_type.from_raw_event(event=aws_event) - - if ( - custom_handler is None - and data_transformation_type not in GENERIC_APP_EVENT_TYPES - ): - CORVA_LOGGER.warning( - f"Handler for {data_transformation_type.__name__!r} " - f"event not found. Skipping..." - ) - return [] + specific_callable = custom_handler or func results = [ specific_callable( diff --git a/src/corva/validate_app_init.py b/src/corva/validate_app_init.py index f47e6410..3a72dba2 100644 --- a/src/corva/validate_app_init.py +++ b/src/corva/validate_app_init.py @@ -42,24 +42,14 @@ def validate_manifested_type( def validate_event_payload(aws_event: Any, raw_event_type: Type[RawBaseEvent]) -> None: if event_cls := guess_event_type(aws_event): - if issubclass(event_cls, RawPartialRerunMergeEvent): - # RawPartialRerunMergeEvent(-s) should be ignored here since - # it is not new app type itself it's just a run mode for existing app types - return - if not issubclass(event_cls, raw_event_type): raise RuntimeError( f'Application with type "{raw_event_type.get_app_type().value}" ' f'was invoked with "{event_cls}" event type' ) - else: - raise RuntimeError( - f'Application with type "{raw_event_type.get_app_type().value}" ' - 'was invoked with "unknown" event type' - ) -@functools.lru_cache(maxsize=None) +@functools.lru_cache(maxsize=1) def read_manifest() -> Optional[Dict[str, Any]]: manifest_json_path = os.path.join(os.getcwd(), "manifest.json") if os.path.exists(manifest_json_path): diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index 628a7a36..74b071c7 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -3,6 +3,7 @@ import pytest +from corva.configuration import SETTINGS from corva.handlers import scheduled, stream, task from corva.models.scheduled.raw import ( RawScheduledDepthEvent, @@ -10,13 +11,9 @@ RawScheduledNaturalTimeEvent, ) from corva.models.scheduled.scheduler_type import SchedulerType -from corva.models.stream.raw import RawStreamEvent -from corva.models.stream.stream import ( - StreamDepthEvent, - StreamDepthRecord, - StreamTimeEvent, - StreamTimeRecord, -) +from corva.models.stream.log_type import LogType +from corva.models.stream.raw import RawStreamEvent, RawStreamTimeEvent, RawTimeRecord, RawMetadata, RawAppMetadata, \ + RawStreamDepthEvent, RawDepthRecord from corva.models.task import RawTaskEvent from corva.validate_app_init import ( read_manifest, @@ -25,38 +22,69 @@ ) raw_scheduled_natural_time_event = RawScheduledNaturalTimeEvent( - asset_id=1, - interval=1, - schedule=1, - app_connection=1, - app_stream=1, - company=1, + asset_id=int(), + interval=int(), + schedule=int(), + app_connection=int(), + app_stream=int(), + company=int(), scheduler_type=SchedulerType.natural_time, - schedule_start=1, + schedule_start=int(), +).dict( + by_alias=True, + exclude_unset=True, ) raw_scheduled_depth_event = RawScheduledDepthEvent( - asset_id=1, - depth_milestone=1.0, - schedule=1, - app_connection=1, - app_stream=1, - company=1, + asset_id=int(), + depth_milestone=float(), + schedule=int(), + app_connection=int(), + app_stream=int(), + company=int(), scheduler_type=SchedulerType.data_depth_milestone, top_depth=0.0, bottom_depth=1.0, log_identifier='', +).dict( + by_alias=True, + exclude_unset=True, ) -stream_time_event = StreamTimeEvent( - asset_id=0, company_id=0, records=[StreamTimeRecord(timestamp=0)] -) - -stream_depth_event = StreamDepthEvent( - asset_id=0, company_id=0, records=[StreamDepthRecord(measured_depth=0)] -) +stream_time_event = RawStreamTimeEvent( + records=[ + RawTimeRecord( + asset_id=0, + company_id=int(), + collection=str(), + timestamp=int(), + ), + ], + metadata=RawMetadata( + app_stream_id=int(), + apps={SETTINGS.APP_KEY: RawAppMetadata(app_connection_id=1)}, + log_type=LogType.time, + ), +).dict() + +stream_depth_event = RawStreamDepthEvent( + records=[ + RawDepthRecord( + asset_id=int(), + company_id=int(), + collection=str(), + measured_depth=float(), + ) + ], + metadata=RawMetadata( + app_stream_id=int(), + apps={SETTINGS.APP_KEY: RawAppMetadata(app_connection_id=int())}, + log_type=LogType.depth, + log_identifier='log_identifier', + ), +).dict() -task_event = RawTaskEvent(task_id='0', version=2) +task_event = RawTaskEvent(task_id='0', version=2).dict() @pytest.mark.parametrize( @@ -64,11 +92,11 @@ ( (task, raw_scheduled_natural_time_event), (task, raw_scheduled_depth_event), - (task, stream_time_event), - (task, stream_depth_event), + (task, [stream_time_event]), + (task, [stream_depth_event]), (scheduled, task_event), - (scheduled, stream_time_event), - (scheduled, stream_depth_event), + (scheduled, [stream_time_event]), + (scheduled, [stream_depth_event]), (stream, task_event), (stream, raw_scheduled_natural_time_event), (stream, raw_scheduled_depth_event), @@ -85,7 +113,7 @@ def lambda_handler(_event, _api): with pytest.raises(RuntimeError): lambda_handler( - event_payload.dict(), + event_payload, context, ) @@ -133,7 +161,7 @@ def test__if_manifested_app_type_is_none__payload_based_validation_called(contex wraps=validate_event_payload, ) as mocked_validate_event_payload: validate_app_type_context( - aws_event=task_event.dict(), raw_event_type=RawTaskEvent + aws_event=task_event, raw_event_type=RawTaskEvent ) mocked_validate_event_payload.assert_called_once() diff --git a/tests/unit/test_partial_rerun_merge_app.py b/tests/unit/test_partial_rerun_merge_app.py index 9282caf7..ccbdf8e0 100644 --- a/tests/unit/test_partial_rerun_merge_app.py +++ b/tests/unit/test_partial_rerun_merge_app.py @@ -3,6 +3,7 @@ from uuid import uuid4 import pytest +from pydantic import ValidationError import corva @@ -49,12 +50,9 @@ def partial_rerun_merge_app(event, api, asset_cache, rerun_asset_cache): raw_event = dict(RAW_EVENT) raw_event["event_type"] = "unknown_event_type" - with pytest.raises(RuntimeError) as e: + with pytest.raises(ValidationError) as e: stream_app(raw_event, context) - assert ( - 'Application with type "stream" was invoked with "unknown" event type' - in str(e.value) - ) + assert "validation error" in str(e.value) def test_merge_event_handler_called_from_stream_app_returns_expected_cache_values( From f442cd099f19548d2d315551e48aff6239ba2db4 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Thu, 30 Jan 2025 18:55:43 +0400 Subject: [PATCH 22/25] fix format --- src/corva/handlers.py | 10 +++++++--- src/corva/validate_app_init.py | 1 - tests/unit/test_apps_event_types.py | 15 ++++++++++----- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/corva/handlers.py b/src/corva/handlers.py index ae0fdab9..480c4124 100644 --- a/src/corva/handlers.py +++ b/src/corva/handlers.py @@ -41,7 +41,11 @@ StreamEventT = TypeVar("StreamEventT", bound=StreamEvent) ScheduledEventT = TypeVar("ScheduledEventT", bound=ScheduledEvent) HANDLERS: Dict[Type[RawBaseEvent], Callable] = {} -GENERIC_APP_EVENT_TYPES = (RawStreamEvent, RawScheduledEvent, RawTaskEvent,) +GENERIC_APP_EVENT_TYPES = ( + RawStreamEvent, + RawScheduledEvent, + RawTaskEvent, +) def get_cache_key( @@ -89,8 +93,8 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: validate_app_type_context(aws_event, raw_event_type) if ( - is_direct_app_call - and data_transformation_type not in GENERIC_APP_EVENT_TYPES + is_direct_app_call + and data_transformation_type not in GENERIC_APP_EVENT_TYPES ): CORVA_LOGGER.warning( f"Handler for {data_transformation_type.__name__!r} " diff --git a/src/corva/validate_app_init.py b/src/corva/validate_app_init.py index 3a72dba2..d953b06e 100644 --- a/src/corva/validate_app_init.py +++ b/src/corva/validate_app_init.py @@ -7,7 +7,6 @@ import pydantic from corva.models.base import AppType, RawBaseEvent -from corva.models.merge.raw import RawPartialRerunMergeEvent def find_leaf_subclasses(base_class): diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index 74b071c7..ad39984f 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -12,8 +12,15 @@ ) from corva.models.scheduled.scheduler_type import SchedulerType from corva.models.stream.log_type import LogType -from corva.models.stream.raw import RawStreamEvent, RawStreamTimeEvent, RawTimeRecord, RawMetadata, RawAppMetadata, \ - RawStreamDepthEvent, RawDepthRecord +from corva.models.stream.raw import ( + RawAppMetadata, + RawDepthRecord, + RawMetadata, + RawStreamDepthEvent, + RawStreamEvent, + RawStreamTimeEvent, + RawTimeRecord, +) from corva.models.task import RawTaskEvent from corva.validate_app_init import ( read_manifest, @@ -160,9 +167,7 @@ def test__if_manifested_app_type_is_none__payload_based_validation_called(contex "corva.validate_app_init.validate_event_payload", wraps=validate_event_payload, ) as mocked_validate_event_payload: - validate_app_type_context( - aws_event=task_event, raw_event_type=RawTaskEvent - ) + validate_app_type_context(aws_event=task_event, raw_event_type=RawTaskEvent) mocked_validate_event_payload.assert_called_once() From 21fce8c957fb2702bc1751c410db84616a5ab364 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Fri, 31 Jan 2025 15:14:52 +0400 Subject: [PATCH 23/25] fix typo for manifest.json scheduled app type --- src/corva/models/base.py | 2 +- src/corva/models/scheduled/raw.py | 2 +- tests/unit/test_apps_event_types.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/corva/models/base.py b/src/corva/models/base.py index ce2bdd5c..670b143c 100644 --- a/src/corva/models/base.py +++ b/src/corva/models/base.py @@ -10,7 +10,7 @@ class AppType(str, Enum): STREAM = "stream" TASK = "task" - SCHEDULED = "scheduled" + SCHEDULER = "scheduler" class CorvaBaseEvent(pydantic.BaseModel): diff --git a/src/corva/models/scheduled/raw.py b/src/corva/models/scheduled/raw.py index 6c4f6c83..fb4c6ef5 100644 --- a/src/corva/models/scheduled/raw.py +++ b/src/corva/models/scheduled/raw.py @@ -34,7 +34,7 @@ class RawScheduledEvent(CorvaBaseEvent, RawBaseEvent): @classmethod def get_app_type(cls) -> AppType: - return AppType.SCHEDULED + return AppType.SCHEDULER @staticmethod def from_raw_event(event: Union[dict, List[List[dict]]]) -> List[RawScheduledEvent]: diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py index ad39984f..7684d0dc 100644 --- a/tests/unit/test_apps_event_types.py +++ b/tests/unit/test_apps_event_types.py @@ -129,11 +129,11 @@ def lambda_handler(_event, _api): 'app_decorator,manifested_app_type', ( (task, "stream"), - (task, "scheduled"), + (task, "scheduler"), (scheduled, "task"), (scheduled, "stream"), (stream, "task"), - (stream, "scheduled"), + (stream, "scheduler"), ), ) def test__lambda_with_mismatched_manifested_type__raise_error( @@ -187,7 +187,7 @@ def test__validate_app_type_with_wrong_app_type_at_manifest__raise_error(context ( ("task", RawTaskEvent), ("stream", RawStreamEvent), - ("scheduled", RawScheduledEvent), + ("scheduler", RawScheduledEvent), ), ) def test__right_manifested_app_type_and_raw_event_type_passed__success( From b221f8b5f070092ddc4ff683f64a73c4e9203afe Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Fri, 7 Feb 2025 15:19:55 +0400 Subject: [PATCH 24/25] fix: update code according to last PR comments requested --- src/corva/handlers.py | 8 ++++---- src/corva/models/base.py | 4 ---- src/corva/models/scheduled/raw.py | 6 +----- src/corva/models/stream/raw.py | 6 +----- src/corva/models/task.py | 6 +----- src/corva/validate_app_init.py | 24 +++++++++++++++++------- 6 files changed, 24 insertions(+), 30 deletions(-) diff --git a/src/corva/handlers.py b/src/corva/handlers.py index 480c4124..0e2b5720 100644 --- a/src/corva/handlers.py +++ b/src/corva/handlers.py @@ -88,10 +88,6 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: if merge_events: aws_event = _merge_events(aws_event, data_transformation_type) - if is_direct_app_call: - # Means current app call is not RawPartialRerunMergeEvent or similar - validate_app_type_context(aws_event, raw_event_type) - if ( is_direct_app_call and data_transformation_type not in GENERIC_APP_EVENT_TYPES @@ -102,6 +98,10 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: ) return [] + if is_direct_app_call: + # Means current app call is not RawPartialRerunMergeEvent or similar + validate_app_type_context(aws_event, raw_event_type) + try: context = CorvaContext.from_aws( aws_event=aws_event, aws_context=aws_context diff --git a/src/corva/models/base.py b/src/corva/models/base.py index 670b143c..489c6ae1 100644 --- a/src/corva/models/base.py +++ b/src/corva/models/base.py @@ -24,7 +24,3 @@ class RawBaseEvent(abc.ABC): @abc.abstractmethod def from_raw_event(event: Any) -> Sequence[RawBaseEvent]: pass - - @classmethod - def get_app_type(cls) -> AppType: - ... diff --git a/src/corva/models/scheduled/raw.py b/src/corva/models/scheduled/raw.py index fb4c6ef5..cc57b42a 100644 --- a/src/corva/models/scheduled/raw.py +++ b/src/corva/models/scheduled/raw.py @@ -7,7 +7,7 @@ from corva.api import Api from corva.models import validators -from corva.models.base import AppType, CorvaBaseEvent, RawBaseEvent +from corva.models.base import CorvaBaseEvent, RawBaseEvent from corva.models.rerun import RerunDepth, RerunTime from corva.models.scheduled.scheduler_type import SchedulerType @@ -32,10 +32,6 @@ class RawScheduledEvent(CorvaBaseEvent, RawBaseEvent): scheduler_type: SchedulerType has_secrets: bool = False - @classmethod - def get_app_type(cls) -> AppType: - return AppType.SCHEDULER - @staticmethod def from_raw_event(event: Union[dict, List[List[dict]]]) -> List[RawScheduledEvent]: if isinstance(event, dict): diff --git a/src/corva/models/stream/raw.py b/src/corva/models/stream/raw.py index 12d4c0b6..cf47be91 100644 --- a/src/corva/models/stream/raw.py +++ b/src/corva/models/stream/raw.py @@ -7,7 +7,7 @@ import pydantic from corva.configuration import SETTINGS -from corva.models.base import AppType, CorvaBaseEvent, RawBaseEvent +from corva.models.base import CorvaBaseEvent, RawBaseEvent from corva.models.rerun import RerunDepth, RerunTime from corva.models.stream.initial import InitialStreamEvent from corva.models.stream.log_type import LogType @@ -86,10 +86,6 @@ class RawStreamEvent(CorvaBaseEvent, RawBaseEvent): # private attributes _max_record_value_cache_key: ClassVar[str] - @classmethod - def get_app_type(cls) -> AppType: - return AppType.STREAM - @property def app_connection_id(self) -> int: return getattr(self.metadata.apps, SETTINGS.APP_KEY).app_connection_id diff --git a/src/corva/models/task.py b/src/corva/models/task.py index b7198b29..019e3031 100644 --- a/src/corva/models/task.py +++ b/src/corva/models/task.py @@ -7,7 +7,7 @@ import requests from corva.api import Api -from corva.models.base import AppType, CorvaBaseEvent, RawBaseEvent +from corva.models.base import CorvaBaseEvent, RawBaseEvent class TaskStatus(enum.Enum): @@ -34,10 +34,6 @@ class RawTaskEvent(CorvaBaseEvent, RawBaseEvent): has_secrets: bool = False version: int = pydantic.Field(..., le=2, ge=2) # only utils API v2 supported - @classmethod - def get_app_type(cls) -> AppType: - return AppType.TASK - @staticmethod def from_raw_event(event: dict) -> List[RawTaskEvent]: return [pydantic.parse_obj_as(RawTaskEvent, event)] diff --git a/src/corva/validate_app_init.py b/src/corva/validate_app_init.py index d953b06e..f19761c9 100644 --- a/src/corva/validate_app_init.py +++ b/src/corva/validate_app_init.py @@ -7,6 +7,15 @@ import pydantic from corva.models.base import AppType, RawBaseEvent +from corva.models.scheduled.raw import RawScheduledEvent +from corva.models.stream.raw import RawStreamEvent +from corva.models.task import RawTaskEvent + +MANIFESTED_APP_TYPE_TO_RAW_BASE_EVENT = { + AppType.TASK: RawTaskEvent, + AppType.STREAM: RawStreamEvent, + AppType.SCHEDULER: RawScheduledEvent +} def find_leaf_subclasses(base_class): @@ -19,7 +28,7 @@ def find_leaf_subclasses(base_class): return leaf_classes -def guess_event_type(aws_event: Any) -> Optional[Type[RawBaseEvent]]: +def get_event_type(aws_event: Any) -> Optional[Type[RawBaseEvent]]: all_children = find_leaf_subclasses(RawBaseEvent) for child_cls in all_children: with contextlib.suppress(pydantic.ValidationError): @@ -29,21 +38,22 @@ def guess_event_type(aws_event: Any) -> Optional[Type[RawBaseEvent]]: def validate_manifested_type( - manifest_app_type: AppType, raw_event_type: Type[RawBaseEvent] + manifest_app_type: 'AppType', raw_event_type: Type[RawBaseEvent] ) -> None: - if manifest_app_type != raw_event_type.get_app_type(): + expected_base_event_cls = MANIFESTED_APP_TYPE_TO_RAW_BASE_EVENT[manifest_app_type] + if expected_base_event_cls != raw_event_type: raise RuntimeError( f'Application with type "{manifest_app_type.value}" ' - f'can\'t invoke a "{raw_event_type.get_app_type().value}" handler' + f'can\'t invoke a "{raw_event_type}" handler' ) def validate_event_payload(aws_event: Any, raw_event_type: Type[RawBaseEvent]) -> None: - if event_cls := guess_event_type(aws_event): + if event_cls := get_event_type(aws_event): if not issubclass(event_cls, raw_event_type): raise RuntimeError( - f'Application with type "{raw_event_type.get_app_type().value}" ' + f'Application with type "{raw_event_type}" ' f'was invoked with "{event_cls}" event type' ) @@ -57,7 +67,7 @@ def read_manifest() -> Optional[Dict[str, Any]]: return None -def get_manifested_type() -> Optional[AppType]: +def get_manifested_type() -> Optional['AppType']: if manifest := read_manifest(): application_type = manifest.get("application", {}).get("type") return AppType(application_type) if application_type else None From 2dea5227a148619794090a3ab56df16cc12dd020 Mon Sep 17 00:00:00 2001 From: Dmytro Kosse <9990225+kossman@users.noreply.github.com> Date: Fri, 7 Feb 2025 15:20:22 +0400 Subject: [PATCH 25/25] fix: linting --- src/corva/validate_app_init.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/corva/validate_app_init.py b/src/corva/validate_app_init.py index f19761c9..92d2a1ef 100644 --- a/src/corva/validate_app_init.py +++ b/src/corva/validate_app_init.py @@ -14,7 +14,7 @@ MANIFESTED_APP_TYPE_TO_RAW_BASE_EVENT = { AppType.TASK: RawTaskEvent, AppType.STREAM: RawStreamEvent, - AppType.SCHEDULER: RawScheduledEvent + AppType.SCHEDULER: RawScheduledEvent, }