Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/concepts/models/model_kinds.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,25 @@ WHERE
AND event_date BETWEEN @start_ds AND @end_ds; -- `event_date` time column filter automatically added by SQLMesh
```

### Partitioning

By default, we ensure that the `time_column` is part of the [partitioned_by](./overview.md#partitioned_by) property of the model so that it forms part of the partition key and allows the database engine to do partition pruning. If it is not explicitly listed in the Model definition, we will automatically add it.

However, this may be undesirable if you want to exclusively partition on another column or you want to partition on something like `month(time_column)` but the engine you're using doesnt support partitioning based on expressions.

To opt out of this behaviour, you can set `partition_by_time_column false` like so:

```sql linenums="1" hl_lines="5"
MODEL (
name db.events,
kind INCREMENTAL_BY_TIME_RANGE (
time_column event_date,
partition_by_time_column false
),
partitioned_by (other_col) -- event_date will no longer be automatically added here and the partition key will just be 'other_col'
);
```

### Idempotency
We recommend making sure incremental by time range model queries are [idempotent](../glossary.md#idempotency) to prevent unexpected results during data [restatement](../plans.md#restatement-plans).

Expand Down
18 changes: 12 additions & 6 deletions sqlmesh/core/model/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,12 +1180,18 @@ def full_depends_on(self) -> t.Set[str]:
def partitioned_by(self) -> t.List[exp.Expression]:
"""Columns to partition the model by, including the time column if it is not already included."""
if self.time_column and not self._is_time_column_in_partitioned_by:
return [
TIME_COL_PARTITION_FUNC.get(self.dialect, lambda x, y: x)(
self.time_column.column, self.columns_to_types
),
*self.partitioned_by_,
]
# This allows the user to opt out of automatic time_column injection
# by setting `partition_by_time_column false` on the model kind
if (
hasattr(self.kind, "partition_by_time_column")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is deliberately a hasattr check so I can utilize this property on a custom materialization

and self.kind.partition_by_time_column
):
return [
TIME_COL_PARTITION_FUNC.get(self.dialect, lambda x, y: x)(
self.time_column.column, self.columns_to_types
),
*self.partitioned_by_,
]
return self.partitioned_by_

@property
Expand Down
7 changes: 7 additions & 0 deletions sqlmesh/core/model/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ class IncrementalByTimeRangeKind(_IncrementalBy):
)
time_column: TimeColumn
auto_restatement_intervals: t.Optional[SQLGlotPositiveInt] = None
partition_by_time_column: SQLGlotBool = True

_time_column_validator = TimeColumn.validator()

Expand All @@ -415,6 +416,11 @@ def to_expression(
expressions=[
*(expressions or []),
self.time_column.to_property(kwargs.get("dialect") or ""),
*_properties(
{
"partition_by_time_column": self.partition_by_time_column,
}
),
*(
[_property("auto_restatement_intervals", self.auto_restatement_intervals)]
if self.auto_restatement_intervals is not None
Expand All @@ -431,6 +437,7 @@ def data_hash_values(self) -> t.List[t.Optional[str]]:
def metadata_hash_values(self) -> t.List[t.Optional[str]]:
return [
*super().metadata_hash_values,
str(self.partition_by_time_column),
str(self.auto_restatement_intervals)
if self.auto_restatement_intervals is not None
else None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Add 'partition_by_time_column' property to the IncrementalByTimeRange model kind
(default: True to keep the original behaviour)"""


def migrate(state_sync, **kwargs): # type: ignore
pass
26 changes: 26 additions & 0 deletions tests/core/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,27 @@ def test_partitioned_by(
] == partition_by_output


def test_opt_out_of_time_column_in_partitioned_by():
expressions = d.parse(
"""
MODEL (
name db.table,
dialect bigquery,
partitioned_by b,
kind INCREMENTAL_BY_TIME_RANGE(
time_column a,
partition_by_time_column false
),
);

SELECT 1::int AS a, 2::int AS b;
"""
)

model = load_sql_based_model(expressions)
assert model.partitioned_by == [exp.to_column('"b"')]


def test_no_model_statement(tmp_path: Path):
# No name inference => MODEL (...) is required
expressions = d.parse("SELECT 1 AS x")
Expand Down Expand Up @@ -1298,6 +1319,7 @@ def test_render_definition():
dialect spark,
kind INCREMENTAL_BY_TIME_RANGE (
time_column (`a`, 'yyyymmdd'),
partition_by_time_column TRUE,
forward_only FALSE,
disable_restatement FALSE,
on_destructive_change 'ERROR'
Expand Down Expand Up @@ -6294,6 +6316,7 @@ def test_model_kind_to_expression():
.sql()
== """INCREMENTAL_BY_TIME_RANGE (
time_column ("a", '%Y-%m-%d'),
partition_by_time_column TRUE,
forward_only FALSE,
disable_restatement FALSE,
on_destructive_change 'ERROR'
Expand Down Expand Up @@ -6324,6 +6347,7 @@ def test_model_kind_to_expression():
.sql()
== """INCREMENTAL_BY_TIME_RANGE (
time_column ("a", '%Y-%m-%d'),
partition_by_time_column TRUE,
batch_size 1,
batch_concurrency 2,
lookback 3,
Expand Down Expand Up @@ -7329,6 +7353,7 @@ def test_auto_restatement():
model.kind.to_expression().sql(pretty=True)
== """INCREMENTAL_BY_TIME_RANGE (
time_column ("a", '%Y-%m-%d'),
partition_by_time_column TRUE,
forward_only FALSE,
disable_restatement FALSE,
on_destructive_change 'ERROR',
Expand Down Expand Up @@ -7356,6 +7381,7 @@ def test_auto_restatement():
model.kind.to_expression().sql(pretty=True)
== """INCREMENTAL_BY_TIME_RANGE (
time_column ("a", '%Y-%m-%d'),
partition_by_time_column TRUE,
auto_restatement_intervals 1,
forward_only FALSE,
disable_restatement FALSE,
Expand Down
5 changes: 3 additions & 2 deletions tests/core/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def test_json(snapshot: Snapshot):
"batch_size": 30,
"forward_only": False,
"on_destructive_change": "ERROR",
"partition_by_time_column": True,
"disable_restatement": False,
"dialect": "spark",
},
Expand Down Expand Up @@ -859,7 +860,7 @@ def test_fingerprint(model: Model, parent_model: Model):

original_fingerprint = SnapshotFingerprint(
data_hash="1312415267",
metadata_hash="2967945306",
metadata_hash="2906564841",
)

assert fingerprint == original_fingerprint
Expand Down Expand Up @@ -959,7 +960,7 @@ def test_fingerprint_jinja_macros(model: Model):
)
original_fingerprint = SnapshotFingerprint(
data_hash="923305614",
metadata_hash="2967945306",
metadata_hash="2906564841",
)

fingerprint = fingerprint_from_node(model, nodes={})
Expand Down
10 changes: 5 additions & 5 deletions tests/integrations/github/cicd/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def test_merge_pr_has_non_breaking_change(

+++

@@ -15,7 +15,8 @@
@@ -16,7 +16,8 @@

SELECT
CAST(o.waiter_id AS INT) AS waiter_id,
Expand Down Expand Up @@ -362,7 +362,7 @@ def test_merge_pr_has_non_breaking_change_diff_start(

+++

@@ -15,7 +15,8 @@
@@ -16,7 +16,8 @@

SELECT
CAST(o.waiter_id AS INT) AS waiter_id,
Expand Down Expand Up @@ -868,7 +868,7 @@ def test_no_merge_since_no_deploy_signal(

+++

@@ -15,7 +15,8 @@
@@ -16,7 +16,8 @@

SELECT
CAST(o.waiter_id AS INT) AS waiter_id,
Expand Down Expand Up @@ -1049,7 +1049,7 @@ def test_no_merge_since_no_deploy_signal_no_approvers_defined(

+++

@@ -15,7 +15,8 @@
@@ -16,7 +16,8 @@

SELECT
CAST(o.waiter_id AS INT) AS waiter_id,
Expand Down Expand Up @@ -1219,7 +1219,7 @@ def test_deploy_comment_pre_categorized(

+++

@@ -15,7 +15,8 @@
@@ -16,7 +16,8 @@

SELECT
CAST(o.waiter_id AS INT) AS waiter_id,
Expand Down
1 change: 1 addition & 0 deletions tests/schedulers/airflow/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def test_apply_plan(mocker: MockerFixture, snapshot: Snapshot):
"time_column": {"column": "`ds`"},
"forward_only": False,
"on_destructive_change": "ERROR",
"partition_by_time_column": True,
"disable_restatement": False,
"dialect": "spark",
},
Expand Down