From 04067bbd8ce18e1644a057d0c0dc684726e795fa Mon Sep 17 00:00:00 2001 From: Alex Skyrta Date: Wed, 16 Apr 2025 16:46:27 +0300 Subject: [PATCH 1/4] fix: change how we generate start/end times for ScheduledDataTime events if merge_events param is True --- src/corva/handlers.py | 27 ++++++++++-- src/corva/models/scheduled/raw.py | 20 +++++++++ tests/unit/test_scheduled_app.py | 71 +++++++++++++++++++++++++++++++ 3 files changed, 115 insertions(+), 3 deletions(-) diff --git a/src/corva/handlers.py b/src/corva/handlers.py index 0e2b5720..f6ad7e83 100644 --- a/src/corva/handlers.py +++ b/src/corva/handlers.py @@ -23,11 +23,12 @@ from corva.api import Api from corva.configuration import SETTINGS from corva.logger import CORVA_LOGGER, CorvaLoggerHandler, LoggingContext +from corva.models import validators from corva.models.base import RawBaseEvent from corva.models.context import CorvaContext from corva.models.merge.merge import PartialRerunMergeEvent from corva.models.merge.raw import RawPartialRerunMergeEvent -from corva.models.scheduled.raw import RawScheduledEvent +from corva.models.scheduled.raw import RawScheduledDataTimeEvent, RawScheduledEvent from corva.models.scheduled.scheduled import ScheduledEvent, ScheduledNaturalTimeEvent from corva.models.scheduled.scheduler_type import SchedulerType from corva.models.stream.raw import RawStreamEvent @@ -300,6 +301,11 @@ def wrapper( hash_name=hash_name, redis_client=redis_client ) + if isinstance(event, RawScheduledDataTimeEvent) and event.merge_metadata: + event = event.rebuild_with_modified_times( + event.merge_metadata.start_time, event.merge_metadata.end_time + ) + app_event = event.scheduler_type.event.parse_obj(event) with LoggingContext( @@ -589,7 +595,9 @@ def _merge_events( """ Merges incoming aws_events into one. Merge happens differently, depending on app type. - Only "scheduled" and "stream" type of apps can be processed here. + Only "scheduled"(data and depth) and "stream" type of apps can be processed here. + Scheduled Natural events don't need a merge since they will never receive multiple + events in batch. If somehow any other type is passed - raise an exception """ if not isinstance( @@ -610,11 +618,24 @@ def _merge_events( if is_depth else ("schedule_start", "schedule_end") ) - min_event_start = min(e[event_start] for e in aws_event) + min_event_start: int = min(e[event_start] for e in aws_event) max_event_end = max( (e[event_end] for e in aws_event if e.get(event_end) is not None), default=None, ) + if not is_depth: + # we're working with ScheduledDataTimeEvent + max_event_start: int = max(e[event_start] for e in aws_event) + interval = aws_event[0]["interval"] + # cast from ms to s if needed + min_value = validators.from_ms_to_s(min_event_start) + max_value = validators.from_ms_to_s(max_event_start) + aws_event[0]["merge_metadata"] = { + "start_time": int(min_value - interval + 1), + "end_time": int(max_value), + "number_of_merged_events": len(aws_event), + } + aws_event[0][event_start] = min_event_start if max_event_end: aws_event[0][event_end] = max_event_end diff --git a/src/corva/models/scheduled/raw.py b/src/corva/models/scheduled/raw.py index cc57b42a..c9f07e06 100644 --- a/src/corva/models/scheduled/raw.py +++ b/src/corva/models/scheduled/raw.py @@ -56,6 +56,17 @@ def set_schedule_as_completed(self, api: Api) -> None: api.post(path=f'scheduler/{self.schedule_id}/completed') +class DataTimeMergeMetadata(CorvaBaseEvent): + """ + For data time events we may need to store information about event merging + (if merge_events=True is used in @scheduled) + """ + + start_time: int + end_time: int + number_of_merged_events: int + + class RawScheduledDataTimeEvent(RawScheduledEvent): """Raw data time scheduled event data. @@ -71,6 +82,7 @@ class RawScheduledDataTimeEvent(RawScheduledEvent): start_time: int = None # type: ignore interval: float rerun: Optional[RerunTime] = None + merge_metadata: Optional[DataTimeMergeMetadata] = None # validators _set_schedule_start = pydantic.validator('schedule_start', allow_reuse=True)( @@ -84,6 +96,14 @@ def set_start_time(cls, values: dict) -> dict: values["start_time"] = int(values["schedule_start"] - values["interval"] + 1) return values + def rebuild_with_modified_times( + self, start_time: int, end_time: int + ) -> RawScheduledDataTimeEvent: + raw_dict = self.dict(exclude_none=True, by_alias=True) + raw_dict["start_time"] = start_time + raw_dict["end_time"] = end_time + return RawScheduledDataTimeEvent.parse_obj(raw_dict) + class RawScheduledDepthEvent(RawScheduledEvent): """Raw depth scheduled event data. diff --git a/tests/unit/test_scheduled_app.py b/tests/unit/test_scheduled_app.py index 31f85fd8..5f78e7b3 100644 --- a/tests/unit/test_scheduled_app.py +++ b/tests/unit/test_scheduled_app.py @@ -1,5 +1,6 @@ import logging import re +from copy import deepcopy import pytest import redis @@ -532,3 +533,73 @@ def scheduled_app(event, api, cache): with pytest.raises(redis.exceptions.ConnectionError): scheduled_app(event, context) + + +@pytest.mark.parametrize("merge_events", [True, False]) +def test_merge_events_parameter(merge_events, context, mocker): + @scheduled(merge_events=merge_events) + def scheduled_app(event, api, state): + # For this test, just return the processed event list for inspection. + return event + + # Create two separate events with different schedule_start values. + # Note: schedule_start is provided in milliseconds. + event1 = RawScheduledDataTimeEvent( + asset_id=1, + interval=60, # interval value in seconds + schedule=123, + schedule_start=1744718400000, # 2025-04-15T12:00:00 in milliseconds + schedule_end=1744718460000, # 2025-04-15T12:01:00 in milliseconds + app_connection=1, + app_stream=2, + company=1, + scheduler_type=SchedulerType.data_time, + ).dict(by_alias=True, exclude_unset=True) + + event2 = RawScheduledDataTimeEvent( + asset_id=1, + interval=60, + schedule=124, + schedule_start=1744718460000, # 2025-04-15T12:01:00 in milliseconds + schedule_end=1744718520000, # 2025-04-15T12:02:00 in milliseconds + app_connection=1, + app_stream=2, + company=1, + scheduler_type=SchedulerType.data_time, + ).dict(by_alias=True, exclude_unset=True) + original_event1 = deepcopy(event1) + + # Combine the events in the input structure + events = [[event1, event2]] + + # Call the scheduled app which should process the events. + result = scheduled_app(events, context) + + if merge_events: + # When merging is enabled, the app should merge the input events + # into a single event. + assert ( + len(result) == 1 + ), "Expected a single merged event when merge_events is true." + merged_event = result[0] + + # Calculate the expected start_time and end_time. + expected_start_time = ( + original_event1["schedule_start"] - original_event1["interval"] + 1 + ) + expected_end_time = event2["schedule_start"] + + assert ( + merged_event.start_time == expected_start_time + ), f"Expected time {expected_start_time}, got {merged_event.start_time}." + # The merged event is expected to have an 'end_time' attribute set. + actual_end_time = getattr(merged_event, "end_time", None) + assert ( + actual_end_time == expected_end_time + ), f"Expected merged end_time {expected_end_time}, got {actual_end_time}." + else: + # When merging is disabled, the app should return the events as-is + # (i.e. two separate events). + assert ( + len(result) == 2 + ), "Expected two separate events when merge_events is false." From cb29870a3deab83cdb6b1a548ea65320ababbfde Mon Sep 17 00:00:00 2001 From: Alex Skyrta Date: Wed, 16 Apr 2025 17:04:41 +0300 Subject: [PATCH 2/4] update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f80e36d9..45e4ef35 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Fixed +- merge_events parameter for scheduled data time apps should result in correct start/end times in a final app event. ## [1.13.1] - 2025-02-17 ### Fixed From ef491ddffb399fe999fd7ec4f8e6564b7095fcfd Mon Sep 17 00:00:00 2001 From: Alex Skyrta Date: Wed, 16 Apr 2025 17:11:56 +0300 Subject: [PATCH 3/4] fix: fix old unit test according to new logic --- tests/unit/test_docs/test_merging.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_docs/test_merging.py b/tests/unit/test_docs/test_merging.py index f5713f7e..ed8084f7 100644 --- a/tests/unit/test_docs/test_merging.py +++ b/tests/unit/test_docs/test_merging.py @@ -70,8 +70,8 @@ def test_tutorial001(context): @pytest.mark.parametrize( "time_ranges, flat", ( - [((60, 120), (61, None), (62, 122)), True], - [((61, None), (60, None), (62, None)), False], + [((60, 120), (120, 180), (180, 240)), True], + [((120, 180), (60, 120), (180, 240)), False], ), ) def test_tutorial002(context, time_ranges, flat): @@ -97,6 +97,6 @@ def test_tutorial002(context, time_ranges, flat): result_event: ScheduledDataTimeEvent = tutorial002.app(event, context)[0] assert result_event.start_time == 1 - assert result_event.end_time == 60 + assert result_event.end_time == 180 max_schedule_value = time_ranges[-1][-1] assert result_event.schedule_end == max_schedule_value # type: ignore[attr-defined] From 2f0c78de01b649301142a852d3f4d0323c999520 Mon Sep 17 00:00:00 2001 From: Alex Skyrta Date: Thu, 17 Apr 2025 12:57:07 +0300 Subject: [PATCH 4/4] prepare for release --- CHANGELOG.md | 6 +++++- docs/antora-playbook.yml | 2 +- docs/antora.yml | 2 +- src/version.py | 2 +- 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 45e4ef35..936bf4b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] + +## [1.14.0] - 2025-04-17 ### Fixed - merge_events parameter for scheduled data time apps should result in correct start/end times in a final app event. @@ -403,7 +405,9 @@ env variables, that should be used to configure logging. - Event classes: `StreamEvent`, `ScheduledEvent` and `TaskEvent`. -[Unreleased] https://github.com/corva-ai/python-sdk/compare/v1.13.0...master +[Unreleased] https://github.com/corva-ai/python-sdk/compare/v1.14.0...master +[1.14.0] https://github.com/corva-ai/python-sdk/compare/v1.13.1...v1.14.0 +[1.13.1] https://github.com/corva-ai/python-sdk/compare/v1.13.0...v1.13.1 [1.13.0] https://github.com/corva-ai/python-sdk/compare/v1.12.1...v1.13.0 [1.12.1] https://github.com/corva-ai/python-sdk/compare/v1.12.0...v1.12.1 [1.12.0] https://github.com/corva-ai/python-sdk/compare/v1.11.4...v1.12.0 diff --git a/docs/antora-playbook.yml b/docs/antora-playbook.yml index 0214d45c..e8b6546e 100644 --- a/docs/antora-playbook.yml +++ b/docs/antora-playbook.yml @@ -7,7 +7,7 @@ content: start_path: docs branches: [] # branches: HEAD # Use this for local development - tags: [v1.13.1] + tags: [v1.14.0] asciidoc: attributes: page-toclevels: 5 diff --git a/docs/antora.yml b/docs/antora.yml index da9cec15..1e5f554e 100644 --- a/docs/antora.yml +++ b/docs/antora.yml @@ -1,3 +1,3 @@ name: corva-sdk -version: ~ +version: 1.14.0 nav: [modules/ROOT/nav.adoc] diff --git a/src/version.py b/src/version.py index 3094cc85..c1230ddf 100644 --- a/src/version.py +++ b/src/version.py @@ -1 +1 @@ -VERSION = "1.13.1" +VERSION = "1.14.0"