Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [1.14.0] - 2025-04-17
### Fixed
- merge_events parameter for scheduled data time apps should result in correct start/end times in a final app event.

## [1.13.1] - 2025-02-17
### Fixed
- Documentation 404 at GitHub pages
Expand Down Expand Up @@ -401,7 +405,9 @@ env variables, that should be used to configure logging.
- Event classes: `StreamEvent`, `ScheduledEvent` and `TaskEvent`.


[Unreleased] https://github.com/corva-ai/python-sdk/compare/v1.13.0...master
[Unreleased] https://github.com/corva-ai/python-sdk/compare/v1.14.0...master
[1.14.0] https://github.com/corva-ai/python-sdk/compare/v1.13.1...v1.14.0
[1.13.1] https://github.com/corva-ai/python-sdk/compare/v1.13.0...v1.13.1
[1.13.0] https://github.com/corva-ai/python-sdk/compare/v1.12.1...v1.13.0
[1.12.1] https://github.com/corva-ai/python-sdk/compare/v1.12.0...v1.12.1
[1.12.0] https://github.com/corva-ai/python-sdk/compare/v1.11.4...v1.12.0
Expand Down
2 changes: 1 addition & 1 deletion docs/antora-playbook.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ content:
start_path: docs
branches: []
# branches: HEAD # Use this for local development
tags: [v1.13.1]
tags: [v1.14.0]
asciidoc:
attributes:
page-toclevels: 5
Expand Down
2 changes: 1 addition & 1 deletion docs/antora.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
name: corva-sdk
version: ~
version: 1.14.0
nav: [modules/ROOT/nav.adoc]
27 changes: 24 additions & 3 deletions src/corva/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
from corva.api import Api
from corva.configuration import SETTINGS
from corva.logger import CORVA_LOGGER, CorvaLoggerHandler, LoggingContext
from corva.models import validators
from corva.models.base import RawBaseEvent
from corva.models.context import CorvaContext
from corva.models.merge.merge import PartialRerunMergeEvent
from corva.models.merge.raw import RawPartialRerunMergeEvent
from corva.models.scheduled.raw import RawScheduledEvent
from corva.models.scheduled.raw import RawScheduledDataTimeEvent, RawScheduledEvent
from corva.models.scheduled.scheduled import ScheduledEvent, ScheduledNaturalTimeEvent
from corva.models.scheduled.scheduler_type import SchedulerType
from corva.models.stream.raw import RawStreamEvent
Expand Down Expand Up @@ -300,6 +301,11 @@ def wrapper(
hash_name=hash_name, redis_client=redis_client
)

if isinstance(event, RawScheduledDataTimeEvent) and event.merge_metadata:
event = event.rebuild_with_modified_times(
event.merge_metadata.start_time, event.merge_metadata.end_time
)

app_event = event.scheduler_type.event.parse_obj(event)

with LoggingContext(
Expand Down Expand Up @@ -589,7 +595,9 @@ def _merge_events(
"""
Merges incoming aws_events into one.
Merge happens differently, depending on app type.
Only "scheduled" and "stream" type of apps can be processed here.
Only "scheduled"(data and depth) and "stream" type of apps can be processed here.
Scheduled Natural events don't need a merge since they will never receive multiple
events in batch.
If somehow any other type is passed - raise an exception
"""
if not isinstance(
Expand All @@ -610,11 +618,24 @@ def _merge_events(
if is_depth
else ("schedule_start", "schedule_end")
)
min_event_start = min(e[event_start] for e in aws_event)
min_event_start: int = min(e[event_start] for e in aws_event)
max_event_end = max(
(e[event_end] for e in aws_event if e.get(event_end) is not None),
default=None,
)
if not is_depth:
# we're working with ScheduledDataTimeEvent
max_event_start: int = max(e[event_start] for e in aws_event)
interval = aws_event[0]["interval"]
# cast from ms to s if needed
min_value = validators.from_ms_to_s(min_event_start)
max_value = validators.from_ms_to_s(max_event_start)
aws_event[0]["merge_metadata"] = {
"start_time": int(min_value - interval + 1),
"end_time": int(max_value),
"number_of_merged_events": len(aws_event),
}

aws_event[0][event_start] = min_event_start
if max_event_end:
aws_event[0][event_end] = max_event_end
Expand Down
20 changes: 20 additions & 0 deletions src/corva/models/scheduled/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ def set_schedule_as_completed(self, api: Api) -> None:
api.post(path=f'scheduler/{self.schedule_id}/completed')


class DataTimeMergeMetadata(CorvaBaseEvent):
"""
For data time events we may need to store information about event merging
(if merge_events=True is used in @scheduled)
"""

start_time: int
end_time: int
number_of_merged_events: int


class RawScheduledDataTimeEvent(RawScheduledEvent):
"""Raw data time scheduled event data.

Expand All @@ -71,6 +82,7 @@ class RawScheduledDataTimeEvent(RawScheduledEvent):
start_time: int = None # type: ignore
interval: float
rerun: Optional[RerunTime] = None
merge_metadata: Optional[DataTimeMergeMetadata] = None

# validators
_set_schedule_start = pydantic.validator('schedule_start', allow_reuse=True)(
Expand All @@ -84,6 +96,14 @@ def set_start_time(cls, values: dict) -> dict:
values["start_time"] = int(values["schedule_start"] - values["interval"] + 1)
return values

def rebuild_with_modified_times(
self, start_time: int, end_time: int
) -> RawScheduledDataTimeEvent:
raw_dict = self.dict(exclude_none=True, by_alias=True)
raw_dict["start_time"] = start_time
raw_dict["end_time"] = end_time
return RawScheduledDataTimeEvent.parse_obj(raw_dict)


class RawScheduledDepthEvent(RawScheduledEvent):
"""Raw depth scheduled event data.
Expand Down
2 changes: 1 addition & 1 deletion src/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = "1.13.1"
VERSION = "1.14.0"
6 changes: 3 additions & 3 deletions tests/unit/test_docs/test_merging.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def test_tutorial001(context):
@pytest.mark.parametrize(
"time_ranges, flat",
(
[((60, 120), (61, None), (62, 122)), True],
[((61, None), (60, None), (62, None)), False],
[((60, 120), (120, 180), (180, 240)), True],
[((120, 180), (60, 120), (180, 240)), False],
),
)
def test_tutorial002(context, time_ranges, flat):
Expand All @@ -97,6 +97,6 @@ def test_tutorial002(context, time_ranges, flat):
result_event: ScheduledDataTimeEvent = tutorial002.app(event, context)[0]

assert result_event.start_time == 1
assert result_event.end_time == 60
assert result_event.end_time == 180
max_schedule_value = time_ranges[-1][-1]
assert result_event.schedule_end == max_schedule_value # type: ignore[attr-defined]
71 changes: 71 additions & 0 deletions tests/unit/test_scheduled_app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import re
from copy import deepcopy

import pytest
import redis
Expand Down Expand Up @@ -532,3 +533,73 @@ def scheduled_app(event, api, cache):

with pytest.raises(redis.exceptions.ConnectionError):
scheduled_app(event, context)


@pytest.mark.parametrize("merge_events", [True, False])
def test_merge_events_parameter(merge_events, context, mocker):
@scheduled(merge_events=merge_events)
def scheduled_app(event, api, state):
# For this test, just return the processed event list for inspection.
return event

# Create two separate events with different schedule_start values.
# Note: schedule_start is provided in milliseconds.
event1 = RawScheduledDataTimeEvent(
asset_id=1,
interval=60, # interval value in seconds
schedule=123,
schedule_start=1744718400000, # 2025-04-15T12:00:00 in milliseconds
schedule_end=1744718460000, # 2025-04-15T12:01:00 in milliseconds
app_connection=1,
app_stream=2,
company=1,
scheduler_type=SchedulerType.data_time,
).dict(by_alias=True, exclude_unset=True)

event2 = RawScheduledDataTimeEvent(
asset_id=1,
interval=60,
schedule=124,
schedule_start=1744718460000, # 2025-04-15T12:01:00 in milliseconds
schedule_end=1744718520000, # 2025-04-15T12:02:00 in milliseconds
app_connection=1,
app_stream=2,
company=1,
scheduler_type=SchedulerType.data_time,
).dict(by_alias=True, exclude_unset=True)
original_event1 = deepcopy(event1)

# Combine the events in the input structure
events = [[event1, event2]]

# Call the scheduled app which should process the events.
result = scheduled_app(events, context)

if merge_events:
# When merging is enabled, the app should merge the input events
# into a single event.
assert (
len(result) == 1
), "Expected a single merged event when merge_events is true."
merged_event = result[0]

# Calculate the expected start_time and end_time.
expected_start_time = (
original_event1["schedule_start"] - original_event1["interval"] + 1
)
expected_end_time = event2["schedule_start"]

assert (
merged_event.start_time == expected_start_time
), f"Expected time {expected_start_time}, got {merged_event.start_time}."
# The merged event is expected to have an 'end_time' attribute set.
actual_end_time = getattr(merged_event, "end_time", None)
assert (
actual_end_time == expected_end_time
), f"Expected merged end_time {expected_end_time}, got {actual_end_time}."
else:
# When merging is disabled, the app should return the events as-is
# (i.e. two separate events).
assert (
len(result) == 2
), "Expected two separate events when merge_events is false."