diff --git a/tests/core/test_scheduler.py b/tests/core/test_scheduler.py index 7e3ed0f1ad..86896ee491 100644 --- a/tests/core/test_scheduler.py +++ b/tests/core/test_scheduler.py @@ -722,3 +722,90 @@ def signal_b(batch: DatetimeRanges): c: [], d: [], } + + +def test_signals_and_lookback(mocker: MockerFixture, make_snapshot, get_batched_missing_intervals): + @signal() + def signal_base(batch: DatetimeRanges): + return [batch[0]] + + signals = signal.get_registry() + + a = make_snapshot( + load_sql_based_model( + parse( # type: ignore + """ + MODEL ( + name base, + kind INCREMENTAL_BY_TIME_RANGE( + lookback 1, + time_column dt, + ), + start '2023-01-01', + signals SIGNAL_BASE(), + ); + + SELECT @start_date AS dt; + """ + ), + signal_definitions=signals, + ), + ) + + b = make_snapshot( + load_sql_based_model( + parse( # type: ignore + """ + MODEL ( + name b, + kind INCREMENTAL_BY_TIME_RANGE( + lookback 1, + time_column dt, + ), + start '2023-01-01' + ); + + SELECT @start_date AS dt; + """ + ), + signal_definitions=signals, + ) + ) + + c = make_snapshot( + load_sql_based_model( + parse( # type: ignore + """ + MODEL ( + name c, + kind INCREMENTAL_BY_TIME_RANGE( + lookback 1, + time_column dt, + ), + start '2023-01-01', + ); + + SELECT * FROM a UNION SELECT * FROM b + """ + ), + signal_definitions=signals, + ), + nodes={a.name: a.model, b.name: b.model}, + ) + + snapshot_evaluator = SnapshotEvaluator(adapters=mocker.MagicMock(), ddl_concurrent_tasks=1) + scheduler = Scheduler( + snapshots=[a, b, c], + snapshot_evaluator=snapshot_evaluator, + state_sync=mocker.MagicMock(), + max_workers=2, + default_catalog=None, + ) + + batches = get_batched_missing_intervals(scheduler, "2023-01-01", "2023-01-03", None) + + assert batches == { + a: [(to_timestamp("2023-01-01"), to_timestamp("2023-01-02"))], + b: [(to_timestamp("2023-01-01"), to_timestamp("2023-01-04"))], + c: [(to_timestamp("2023-01-01"), to_timestamp("2023-01-02"))], + }