Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sqlmesh/core/config/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ModelDefaultsConfig(BaseConfig):
allow_partials: Whether the models can process partial (incomplete) data intervals.
enabled: Whether the models are enabled.
interval_unit: The temporal granularity of the models data intervals. By default computed from cron.
batch_concurrency: The maximum number of batches that can run concurrently for an incremental model.
pre_statements: The list of SQL statements that get executed before a model runs.
post_statements: The list of SQL statements that get executed before a model runs.
on_virtual_update: The list of SQL statements to be executed after the virtual update.
Expand All @@ -69,6 +70,7 @@ class ModelDefaultsConfig(BaseConfig):
interval_unit: t.Optional[t.Union[str, IntervalUnit]] = None
enabled: t.Optional[t.Union[str, bool]] = None
formatting: t.Optional[t.Union[str, bool]] = None
batch_concurrency: t.Optional[int] = None
pre_statements: t.Optional[t.List[t.Union[str, exp.Expression]]] = None
post_statements: t.Optional[t.List[t.Union[str, exp.Expression]]] = None
on_virtual_update: t.Optional[t.List[t.Union[str, exp.Expression]]] = None
Expand Down
12 changes: 12 additions & 0 deletions sqlmesh/core/model/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,18 @@ def create_model_kind(v: t.Any, dialect: str, defaults: t.Dict[str, t.Any]) -> M
):
props[on_change_property] = defaults.get(on_change_property)

# only pass the batch_concurrency user default to models inheriting from _IncrementalBy
# that don't explicitly set it in the model definition, but ignore subclasses of _IncrementalBy
# that hardcode a specific batch_concurrency
if issubclass(kind_type, _IncrementalBy):
BATCH_CONCURRENCY: t.Final = "batch_concurrency"
if (
props.get(BATCH_CONCURRENCY) is None
and defaults.get(BATCH_CONCURRENCY) is not None
and kind_type.all_field_infos()[BATCH_CONCURRENCY].default is None
):
props[BATCH_CONCURRENCY] = defaults.get(BATCH_CONCURRENCY)

if kind_type == CustomKind:
# load the custom materialization class and check if it uses a custom kind type
from sqlmesh.core.snapshot.evaluator import get_custom_materialization_type
Expand Down
101 changes: 101 additions & 0 deletions tests/core/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7583,6 +7583,107 @@ def test_forward_only_on_destructive_change_config() -> None:
assert context_model.on_destructive_change.is_allow


def test_batch_concurrency_config() -> None:
# No batch_concurrency default for incremental models
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb"))
context = Context(config=config)

expressions = d.parse(
"""
MODEL (
name memory.db.table,
kind INCREMENTAL_BY_TIME_RANGE (
time_column c
),
);
SELECT a, b, c FROM source_table;
"""
)
model = load_sql_based_model(expressions, defaults=config.model_defaults.dict())
context.upsert_model(model)
context_model = context.get_model("memory.db.table")
assert context_model.batch_concurrency is None

# batch_concurrency specified in model defaults applies to incremental models
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5))
context = Context(config=config)

expressions = d.parse(
"""
MODEL (
name memory.db.table,
kind INCREMENTAL_BY_TIME_RANGE (
time_column c
),
);
SELECT a, b, c FROM source_table;
"""
)
model = load_sql_based_model(expressions, defaults=config.model_defaults.dict())
context.upsert_model(model)
context_model = context.get_model("memory.db.table")
assert context_model.batch_concurrency == 5

# batch_concurrency specified in model definition overrides default
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5))
context = Context(config=config)

expressions = d.parse(
"""
MODEL (
name memory.db.table,
kind INCREMENTAL_BY_TIME_RANGE (
time_column c,
batch_concurrency 10
),
);
SELECT a, b, c FROM source_table;
"""
)
model = load_sql_based_model(expressions, defaults=config.model_defaults.dict())
context.upsert_model(model)
context_model = context.get_model("memory.db.table")
assert context_model.batch_concurrency == 10

# batch_concurrency default does not apply to non-incremental models
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5))
context = Context(config=config)

expressions = d.parse(
"""
MODEL (
name memory.db.table,
kind FULL,
);
SELECT a, b, c FROM source_table;
"""
)
model = load_sql_based_model(expressions, defaults=config.model_defaults.dict())
context.upsert_model(model)
context_model = context.get_model("memory.db.table")
assert context_model.batch_concurrency is None

# batch_concurrency default does not apply to INCREMENTAL_BY_UNIQUE_KEY models
config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5))
context = Context(config=config)

expressions = d.parse(
"""
MODEL (
name memory.db.table,
kind INCREMENTAL_BY_UNIQUE_KEY (
unique_key a
),
);
SELECT a, b, c FROM source_table;
"""
)
model = load_sql_based_model(expressions, defaults=config.model_defaults.dict())
context.upsert_model(model)
context_model = context.get_model("memory.db.table")
assert context_model.batch_concurrency == 1


def test_model_meta_on_additive_change_property() -> None:
"""Test that ModelMeta has on_additive_change property that works like on_destructive_change."""
from sqlmesh.core.model.kind import IncrementalByTimeRangeKind, OnAdditiveChange
Expand Down