From 0a8a3492da41b8735d2e715035c67ab5ab9e5537 Mon Sep 17 00:00:00 2001 From: Erin Drummond Date: Tue, 4 Mar 2025 21:45:38 +0000 Subject: [PATCH] Feat: Allow opting out of the time_column being automatically added to the partitioned_by columns --- docs/concepts/models/model_kinds.md | 19 ++++++++++++++ sqlmesh/core/model/definition.py | 18 ++++++++----- sqlmesh/core/model/kind.py | 7 +++++ ...4_add_partition_by_time_column_property.py | 6 +++++ tests/core/test_model.py | 26 +++++++++++++++++++ tests/core/test_snapshot.py | 5 ++-- .../github/cicd/test_integration.py | 10 +++---- tests/schedulers/airflow/test_client.py | 1 + 8 files changed, 79 insertions(+), 13 deletions(-) create mode 100644 sqlmesh/migrations/v0074_add_partition_by_time_column_property.py diff --git a/docs/concepts/models/model_kinds.md b/docs/concepts/models/model_kinds.md index c9d658d94b..ccc85cf9b9 100644 --- a/docs/concepts/models/model_kinds.md +++ b/docs/concepts/models/model_kinds.md @@ -408,6 +408,25 @@ WHERE AND event_date BETWEEN @start_ds AND @end_ds; -- `event_date` time column filter automatically added by SQLMesh ``` +### Partitioning + +By default, we ensure that the `time_column` is part of the [partitioned_by](./overview.md#partitioned_by) property of the model so that it forms part of the partition key and allows the database engine to do partition pruning. If it is not explicitly listed in the Model definition, we will automatically add it. + +However, this may be undesirable if you want to exclusively partition on another column or you want to partition on something like `month(time_column)` but the engine you're using doesnt support partitioning based on expressions. + +To opt out of this behaviour, you can set `partition_by_time_column false` like so: + +```sql linenums="1" hl_lines="5" +MODEL ( + name db.events, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date, + partition_by_time_column false + ), + partitioned_by (other_col) -- event_date will no longer be automatically added here and the partition key will just be 'other_col' +); +``` + ### Idempotency We recommend making sure incremental by time range model queries are [idempotent](../glossary.md#idempotency) to prevent unexpected results during data [restatement](../plans.md#restatement-plans). diff --git a/sqlmesh/core/model/definition.py b/sqlmesh/core/model/definition.py index 18d8e9214c..d3d212e0e1 100644 --- a/sqlmesh/core/model/definition.py +++ b/sqlmesh/core/model/definition.py @@ -1180,12 +1180,18 @@ def full_depends_on(self) -> t.Set[str]: def partitioned_by(self) -> t.List[exp.Expression]: """Columns to partition the model by, including the time column if it is not already included.""" if self.time_column and not self._is_time_column_in_partitioned_by: - return [ - TIME_COL_PARTITION_FUNC.get(self.dialect, lambda x, y: x)( - self.time_column.column, self.columns_to_types - ), - *self.partitioned_by_, - ] + # This allows the user to opt out of automatic time_column injection + # by setting `partition_by_time_column false` on the model kind + if ( + hasattr(self.kind, "partition_by_time_column") + and self.kind.partition_by_time_column + ): + return [ + TIME_COL_PARTITION_FUNC.get(self.dialect, lambda x, y: x)( + self.time_column.column, self.columns_to_types + ), + *self.partitioned_by_, + ] return self.partitioned_by_ @property diff --git a/sqlmesh/core/model/kind.py b/sqlmesh/core/model/kind.py index 112aede3c5..85df3c8f07 100644 --- a/sqlmesh/core/model/kind.py +++ b/sqlmesh/core/model/kind.py @@ -405,6 +405,7 @@ class IncrementalByTimeRangeKind(_IncrementalBy): ) time_column: TimeColumn auto_restatement_intervals: t.Optional[SQLGlotPositiveInt] = None + partition_by_time_column: SQLGlotBool = True _time_column_validator = TimeColumn.validator() @@ -415,6 +416,11 @@ def to_expression( expressions=[ *(expressions or []), self.time_column.to_property(kwargs.get("dialect") or ""), + *_properties( + { + "partition_by_time_column": self.partition_by_time_column, + } + ), *( [_property("auto_restatement_intervals", self.auto_restatement_intervals)] if self.auto_restatement_intervals is not None @@ -431,6 +437,7 @@ def data_hash_values(self) -> t.List[t.Optional[str]]: def metadata_hash_values(self) -> t.List[t.Optional[str]]: return [ *super().metadata_hash_values, + str(self.partition_by_time_column), str(self.auto_restatement_intervals) if self.auto_restatement_intervals is not None else None, diff --git a/sqlmesh/migrations/v0074_add_partition_by_time_column_property.py b/sqlmesh/migrations/v0074_add_partition_by_time_column_property.py new file mode 100644 index 0000000000..30fbce46e0 --- /dev/null +++ b/sqlmesh/migrations/v0074_add_partition_by_time_column_property.py @@ -0,0 +1,6 @@ +"""Add 'partition_by_time_column' property to the IncrementalByTimeRange model kind +(default: True to keep the original behaviour)""" + + +def migrate(state_sync, **kwargs): # type: ignore + pass diff --git a/tests/core/test_model.py b/tests/core/test_model.py index 933e2fc4fb..ffbb98b16c 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -345,6 +345,27 @@ def test_partitioned_by( ] == partition_by_output +def test_opt_out_of_time_column_in_partitioned_by(): + expressions = d.parse( + """ + MODEL ( + name db.table, + dialect bigquery, + partitioned_by b, + kind INCREMENTAL_BY_TIME_RANGE( + time_column a, + partition_by_time_column false + ), + ); + + SELECT 1::int AS a, 2::int AS b; + """ + ) + + model = load_sql_based_model(expressions) + assert model.partitioned_by == [exp.to_column('"b"')] + + def test_no_model_statement(tmp_path: Path): # No name inference => MODEL (...) is required expressions = d.parse("SELECT 1 AS x") @@ -1298,6 +1319,7 @@ def test_render_definition(): dialect spark, kind INCREMENTAL_BY_TIME_RANGE ( time_column (`a`, 'yyyymmdd'), + partition_by_time_column TRUE, forward_only FALSE, disable_restatement FALSE, on_destructive_change 'ERROR' @@ -6294,6 +6316,7 @@ def test_model_kind_to_expression(): .sql() == """INCREMENTAL_BY_TIME_RANGE ( time_column ("a", '%Y-%m-%d'), +partition_by_time_column TRUE, forward_only FALSE, disable_restatement FALSE, on_destructive_change 'ERROR' @@ -6324,6 +6347,7 @@ def test_model_kind_to_expression(): .sql() == """INCREMENTAL_BY_TIME_RANGE ( time_column ("a", '%Y-%m-%d'), +partition_by_time_column TRUE, batch_size 1, batch_concurrency 2, lookback 3, @@ -7329,6 +7353,7 @@ def test_auto_restatement(): model.kind.to_expression().sql(pretty=True) == """INCREMENTAL_BY_TIME_RANGE ( time_column ("a", '%Y-%m-%d'), + partition_by_time_column TRUE, forward_only FALSE, disable_restatement FALSE, on_destructive_change 'ERROR', @@ -7356,6 +7381,7 @@ def test_auto_restatement(): model.kind.to_expression().sql(pretty=True) == """INCREMENTAL_BY_TIME_RANGE ( time_column ("a", '%Y-%m-%d'), + partition_by_time_column TRUE, auto_restatement_intervals 1, forward_only FALSE, disable_restatement FALSE, diff --git a/tests/core/test_snapshot.py b/tests/core/test_snapshot.py index 091749cc0b..06cd7c159b 100644 --- a/tests/core/test_snapshot.py +++ b/tests/core/test_snapshot.py @@ -130,6 +130,7 @@ def test_json(snapshot: Snapshot): "batch_size": 30, "forward_only": False, "on_destructive_change": "ERROR", + "partition_by_time_column": True, "disable_restatement": False, "dialect": "spark", }, @@ -859,7 +860,7 @@ def test_fingerprint(model: Model, parent_model: Model): original_fingerprint = SnapshotFingerprint( data_hash="1312415267", - metadata_hash="2967945306", + metadata_hash="2906564841", ) assert fingerprint == original_fingerprint @@ -959,7 +960,7 @@ def test_fingerprint_jinja_macros(model: Model): ) original_fingerprint = SnapshotFingerprint( data_hash="923305614", - metadata_hash="2967945306", + metadata_hash="2906564841", ) fingerprint = fingerprint_from_node(model, nodes={}) diff --git a/tests/integrations/github/cicd/test_integration.py b/tests/integrations/github/cicd/test_integration.py index 1157a064ed..35afbb89a3 100644 --- a/tests/integrations/github/cicd/test_integration.py +++ b/tests/integrations/github/cicd/test_integration.py @@ -165,7 +165,7 @@ def test_merge_pr_has_non_breaking_change( +++ -@@ -15,7 +15,8 @@ +@@ -16,7 +16,8 @@ SELECT CAST(o.waiter_id AS INT) AS waiter_id, @@ -362,7 +362,7 @@ def test_merge_pr_has_non_breaking_change_diff_start( +++ -@@ -15,7 +15,8 @@ +@@ -16,7 +16,8 @@ SELECT CAST(o.waiter_id AS INT) AS waiter_id, @@ -868,7 +868,7 @@ def test_no_merge_since_no_deploy_signal( +++ -@@ -15,7 +15,8 @@ +@@ -16,7 +16,8 @@ SELECT CAST(o.waiter_id AS INT) AS waiter_id, @@ -1049,7 +1049,7 @@ def test_no_merge_since_no_deploy_signal_no_approvers_defined( +++ -@@ -15,7 +15,8 @@ +@@ -16,7 +16,8 @@ SELECT CAST(o.waiter_id AS INT) AS waiter_id, @@ -1219,7 +1219,7 @@ def test_deploy_comment_pre_categorized( +++ -@@ -15,7 +15,8 @@ +@@ -16,7 +16,8 @@ SELECT CAST(o.waiter_id AS INT) AS waiter_id, diff --git a/tests/schedulers/airflow/test_client.py b/tests/schedulers/airflow/test_client.py index 01f5c59525..524874e787 100644 --- a/tests/schedulers/airflow/test_client.py +++ b/tests/schedulers/airflow/test_client.py @@ -113,6 +113,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot): "time_column": {"column": "`ds`"}, "forward_only": False, "on_destructive_change": "ERROR", + "partition_by_time_column": True, "disable_restatement": False, "dialect": "spark", },