diff --git a/CHANGELOG.md b/CHANGELOG.md index ec1e0bd4..8164c9f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.12.1] - 2025-01-27 +### Fixed +- Add app logic for meaningful error when mismatched app type is used either declared at `manifest.json` or according + to used event payload passed to app + + ## [1.12.0] - 2024-10-06 ### Fixed - Unset the `CORVA_LOGGER.propagate = False`, so the OTel handler will be able to collect and send those logs as well diff --git a/docs/antora-playbook.yml b/docs/antora-playbook.yml index 9297d175..8064f2b3 100644 --- a/docs/antora-playbook.yml +++ b/docs/antora-playbook.yml @@ -7,7 +7,7 @@ content: start_path: docs branches: [] # branches: HEAD # Use this for local development - tags: [v1.12.0] + tags: [v1.12.1] asciidoc: attributes: page-toclevels: 5 diff --git a/src/corva/handlers.py b/src/corva/handlers.py index e86d2cec..0e2b5720 100644 --- a/src/corva/handlers.py +++ b/src/corva/handlers.py @@ -36,11 +36,16 @@ from corva.service import service from corva.service.api_sdk import CachingApiSdk, CorvaApiSdk from corva.service.cache_sdk import FakeInternalCacheSdk, InternalRedisSdk, UserRedisSdk +from corva.validate_app_init import validate_app_type_context StreamEventT = TypeVar("StreamEventT", bound=StreamEvent) ScheduledEventT = TypeVar("ScheduledEventT", bound=ScheduledEvent) HANDLERS: Dict[Type[RawBaseEvent], Callable] = {} -GENERIC_APP_EVENT_TYPES = [RawStreamEvent, RawScheduledEvent, RawTaskEvent] +GENERIC_APP_EVENT_TYPES = ( + RawStreamEvent, + RawScheduledEvent, + RawTaskEvent, +) def get_cache_key( @@ -72,11 +77,30 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: user_handler=handler, logger=CORVA_LOGGER, ) as logging_ctx: + # Verify either current call from app_decorator or not + # for instance from partial rerun merge ( raw_custom_event_type, custom_handler, ) = _get_custom_event_type_by_raw_aws_event(aws_event) - specific_callable = custom_handler or func + is_direct_app_call: bool = not custom_handler + data_transformation_type = raw_custom_event_type or raw_event_type + if merge_events: + aws_event = _merge_events(aws_event, data_transformation_type) + + if ( + is_direct_app_call + and data_transformation_type not in GENERIC_APP_EVENT_TYPES + ): + CORVA_LOGGER.warning( + f"Handler for {data_transformation_type.__name__!r} " + f"event not found. Skipping..." + ) + return [] + + if is_direct_app_call: + # Means current app call is not RawPartialRerunMergeEvent or similar + validate_app_type_context(aws_event, raw_event_type) try: context = CorvaContext.from_aws( @@ -86,20 +110,8 @@ def wrapper(aws_event: Any, aws_context: Any) -> List[Any]: redis_client = redis.Redis.from_url( url=SETTINGS.CACHE_URL, decode_responses=True, max_connections=1 ) - data_transformation_type = raw_custom_event_type or raw_event_type - if merge_events: - aws_event = _merge_events(aws_event, data_transformation_type) raw_events = data_transformation_type.from_raw_event(event=aws_event) - - if ( - custom_handler is None - and data_transformation_type not in GENERIC_APP_EVENT_TYPES - ): - CORVA_LOGGER.warning( - f"Handler for {data_transformation_type.__name__!r} " - f"event not found. Skipping..." - ) - return [] + specific_callable = custom_handler or func results = [ specific_callable( @@ -354,7 +366,11 @@ def task( return functools.partial(task, handler=handler) @functools.wraps(func) - @functools.partial(base_handler, raw_event_type=RawTaskEvent, handler=handler) + @functools.partial( + base_handler, + raw_event_type=RawTaskEvent, + handler=handler, + ) def wrapper( event: RawTaskEvent, api_key: str, diff --git a/src/corva/models/base.py b/src/corva/models/base.py index 6b94e5da..489c6ae1 100644 --- a/src/corva/models/base.py +++ b/src/corva/models/base.py @@ -1,11 +1,18 @@ from __future__ import annotations import abc +from enum import Enum from typing import Any, Sequence import pydantic +class AppType(str, Enum): + STREAM = "stream" + TASK = "task" + SCHEDULER = "scheduler" + + class CorvaBaseEvent(pydantic.BaseModel): class Config: extra = pydantic.Extra.allow diff --git a/src/corva/validate_app_init.py b/src/corva/validate_app_init.py new file mode 100644 index 00000000..92d2a1ef --- /dev/null +++ b/src/corva/validate_app_init.py @@ -0,0 +1,83 @@ +import contextlib +import functools +import json +import os +from typing import Any, Dict, Optional, Type + +import pydantic + +from corva.models.base import AppType, RawBaseEvent +from corva.models.scheduled.raw import RawScheduledEvent +from corva.models.stream.raw import RawStreamEvent +from corva.models.task import RawTaskEvent + +MANIFESTED_APP_TYPE_TO_RAW_BASE_EVENT = { + AppType.TASK: RawTaskEvent, + AppType.STREAM: RawStreamEvent, + AppType.SCHEDULER: RawScheduledEvent, +} + + +def find_leaf_subclasses(base_class): + leaf_classes = [] + for subclass in base_class.__subclasses__(): + if not subclass.__subclasses__(): + leaf_classes.append(subclass) + else: + leaf_classes.extend(find_leaf_subclasses(subclass)) + return leaf_classes + + +def get_event_type(aws_event: Any) -> Optional[Type[RawBaseEvent]]: + all_children = find_leaf_subclasses(RawBaseEvent) + for child_cls in all_children: + with contextlib.suppress(pydantic.ValidationError): + child_cls.from_raw_event(aws_event) + return child_cls + return None + + +def validate_manifested_type( + manifest_app_type: 'AppType', raw_event_type: Type[RawBaseEvent] +) -> None: + expected_base_event_cls = MANIFESTED_APP_TYPE_TO_RAW_BASE_EVENT[manifest_app_type] + if expected_base_event_cls != raw_event_type: + raise RuntimeError( + f'Application with type "{manifest_app_type.value}" ' + f'can\'t invoke a "{raw_event_type}" handler' + ) + + +def validate_event_payload(aws_event: Any, raw_event_type: Type[RawBaseEvent]) -> None: + + if event_cls := get_event_type(aws_event): + if not issubclass(event_cls, raw_event_type): + raise RuntimeError( + f'Application with type "{raw_event_type}" ' + f'was invoked with "{event_cls}" event type' + ) + + +@functools.lru_cache(maxsize=1) +def read_manifest() -> Optional[Dict[str, Any]]: + manifest_json_path = os.path.join(os.getcwd(), "manifest.json") + if os.path.exists(manifest_json_path): + with open(manifest_json_path, "r") as manifest_json_file: + return json.load(manifest_json_file) + return None + + +def get_manifested_type() -> Optional['AppType']: + if manifest := read_manifest(): + application_type = manifest.get("application", {}).get("type") + return AppType(application_type) if application_type else None + return None + + +def validate_app_type_context( + aws_event: Any, raw_event_type: Type[RawBaseEvent] +) -> None: + if manifested_type := get_manifested_type(): + validate_manifested_type(manifested_type, raw_event_type) + else: + validate_event_payload(aws_event, raw_event_type) diff --git a/src/version.py b/src/version.py index 5878cbea..48028b69 100644 --- a/src/version.py +++ b/src/version.py @@ -1 +1 @@ -VERSION = "1.12.0" +VERSION = "1.12.1" diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 71461bb8..7ebfaaa6 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -3,6 +3,7 @@ from corva.configuration import SETTINGS from corva.testing import TestClient +from corva.validate_app_init import read_manifest @pytest.fixture(scope='function', autouse=True) @@ -16,6 +17,13 @@ def clean_redis(): redis_client.flushall() +@pytest.fixture(scope='function', autouse=True) +def clean_read_manifest_lru_cache(): + read_manifest.cache_clear() + yield + read_manifest.cache_clear() + + @pytest.fixture(scope='function') def context(): return TestClient._context diff --git a/tests/unit/test_apps_event_types.py b/tests/unit/test_apps_event_types.py new file mode 100644 index 00000000..7684d0dc --- /dev/null +++ b/tests/unit/test_apps_event_types.py @@ -0,0 +1,234 @@ +import json +from unittest import mock + +import pytest + +from corva.configuration import SETTINGS +from corva.handlers import scheduled, stream, task +from corva.models.scheduled.raw import ( + RawScheduledDepthEvent, + RawScheduledEvent, + RawScheduledNaturalTimeEvent, +) +from corva.models.scheduled.scheduler_type import SchedulerType +from corva.models.stream.log_type import LogType +from corva.models.stream.raw import ( + RawAppMetadata, + RawDepthRecord, + RawMetadata, + RawStreamDepthEvent, + RawStreamEvent, + RawStreamTimeEvent, + RawTimeRecord, +) +from corva.models.task import RawTaskEvent +from corva.validate_app_init import ( + read_manifest, + validate_app_type_context, + validate_event_payload, +) + +raw_scheduled_natural_time_event = RawScheduledNaturalTimeEvent( + asset_id=int(), + interval=int(), + schedule=int(), + app_connection=int(), + app_stream=int(), + company=int(), + scheduler_type=SchedulerType.natural_time, + schedule_start=int(), +).dict( + by_alias=True, + exclude_unset=True, +) + +raw_scheduled_depth_event = RawScheduledDepthEvent( + asset_id=int(), + depth_milestone=float(), + schedule=int(), + app_connection=int(), + app_stream=int(), + company=int(), + scheduler_type=SchedulerType.data_depth_milestone, + top_depth=0.0, + bottom_depth=1.0, + log_identifier='', +).dict( + by_alias=True, + exclude_unset=True, +) + +stream_time_event = RawStreamTimeEvent( + records=[ + RawTimeRecord( + asset_id=0, + company_id=int(), + collection=str(), + timestamp=int(), + ), + ], + metadata=RawMetadata( + app_stream_id=int(), + apps={SETTINGS.APP_KEY: RawAppMetadata(app_connection_id=1)}, + log_type=LogType.time, + ), +).dict() + +stream_depth_event = RawStreamDepthEvent( + records=[ + RawDepthRecord( + asset_id=int(), + company_id=int(), + collection=str(), + measured_depth=float(), + ) + ], + metadata=RawMetadata( + app_stream_id=int(), + apps={SETTINGS.APP_KEY: RawAppMetadata(app_connection_id=int())}, + log_type=LogType.depth, + log_identifier='log_identifier', + ), +).dict() + +task_event = RawTaskEvent(task_id='0', version=2).dict() + + +@pytest.mark.parametrize( + 'app_decorator,event_payload', + ( + (task, raw_scheduled_natural_time_event), + (task, raw_scheduled_depth_event), + (task, [stream_time_event]), + (task, [stream_depth_event]), + (scheduled, task_event), + (scheduled, [stream_time_event]), + (scheduled, [stream_depth_event]), + (stream, task_event), + (stream, raw_scheduled_natural_time_event), + (stream, raw_scheduled_depth_event), + ), +) +def test__lambda_call_with_mismatched_event_type__raise_error( + app_decorator, + event_payload, + context, +): + @app_decorator + def lambda_handler(_event, _api): + ... + + with pytest.raises(RuntimeError): + lambda_handler( + event_payload, + context, + ) + + +@pytest.mark.parametrize( + 'app_decorator,manifested_app_type', + ( + (task, "stream"), + (task, "scheduler"), + (scheduled, "task"), + (scheduled, "stream"), + (stream, "task"), + (stream, "scheduler"), + ), +) +def test__lambda_with_mismatched_manifested_type__raise_error( + app_decorator, + manifested_app_type, + context, +): + @app_decorator + def lambda_handler(_event, _api): + ... + + mocked_manifest = {"application": {"type": manifested_app_type}} + + with mock.patch( + "corva.validate_app_init.read_manifest", return_value=mocked_manifest + ): + with pytest.raises(RuntimeError): + lambda_handler( + {}, + context, + ) + + +def test__if_manifested_app_type_is_none__payload_based_validation_called(context): + mocked_manifest = {"application": {"type": None}} + + with mock.patch( + "corva.validate_app_init.read_manifest", return_value=mocked_manifest + ): + with mock.patch( + "corva.validate_app_init.validate_event_payload", + wraps=validate_event_payload, + ) as mocked_validate_event_payload: + validate_app_type_context(aws_event=task_event, raw_event_type=RawTaskEvent) + + mocked_validate_event_payload.assert_called_once() + + +def test__validate_app_type_with_wrong_app_type_at_manifest__raise_error(context): + mocked_manifest = {"application": {"type": "wrong_type"}} + + with mock.patch( + "corva.validate_app_init.read_manifest", return_value=mocked_manifest + ): + with pytest.raises(ValueError, match="'wrong_type' is not a valid AppType"): + validate_app_type_context(aws_event=None, raw_event_type=RawTaskEvent) + + +@pytest.mark.parametrize( + 'manifested_app_type, raw_base_event_type', + ( + ("task", RawTaskEvent), + ("stream", RawStreamEvent), + ("scheduler", RawScheduledEvent), + ), +) +def test__right_manifested_app_type_and_raw_event_type_passed__success( + manifested_app_type, + raw_base_event_type, +): + mocked_manifest = {"application": {"type": manifested_app_type}} + + with mock.patch( + "corva.validate_app_init.read_manifest", return_value=mocked_manifest + ): + validate_app_type_context(aws_event="any", raw_event_type=raw_base_event_type) + + +def test__read_correct_manifest_file__success(context): + """Test when manifest.json exists and contains valid JSON.""" + manifest_payload = {"application": {"type": "stream"}} + manifest_payload_json = json.dumps(manifest_payload) + + with mock.patch("os.path.exists", return_value=True): + with mock.patch( + "builtins.open", mock.mock_open(read_data=manifest_payload_json) + ): + result = read_manifest() + assert result == manifest_payload + + +def test__read_invalid_json_manifest_file__error(context): + """Test when manifest.json contains invalid JSON.""" + manifest_invalid_json = "{invalid: json}" + + with mock.patch("os.path.exists", return_value=True): + with mock.patch( + "builtins.open", mock.mock_open(read_data=manifest_invalid_json) + ): + with pytest.raises(json.JSONDecodeError): + read_manifest() + + +def test__manifest_file_not_exists__success(context): + """Test when manifest.json contains invalid JSON.""" + with mock.patch("os.path.exists", return_value=False): + result = read_manifest() + assert result is None diff --git a/tests/unit/test_partial_rerun_merge_app.py b/tests/unit/test_partial_rerun_merge_app.py index 615a666b..ccbdf8e0 100644 --- a/tests/unit/test_partial_rerun_merge_app.py +++ b/tests/unit/test_partial_rerun_merge_app.py @@ -49,6 +49,7 @@ def partial_rerun_merge_app(event, api, asset_cache, rerun_asset_cache): raw_event = dict(RAW_EVENT) raw_event["event_type"] = "unknown_event_type" + with pytest.raises(ValidationError) as e: stream_app(raw_event, context) assert "validation error" in str(e.value)