diff --git a/sqlmesh/core/config/model.py b/sqlmesh/core/config/model.py index 5406a5497b..aeefdf2557 100644 --- a/sqlmesh/core/config/model.py +++ b/sqlmesh/core/config/model.py @@ -45,6 +45,7 @@ class ModelDefaultsConfig(BaseConfig): allow_partials: Whether the models can process partial (incomplete) data intervals. enabled: Whether the models are enabled. interval_unit: The temporal granularity of the models data intervals. By default computed from cron. + batch_concurrency: The maximum number of batches that can run concurrently for an incremental model. pre_statements: The list of SQL statements that get executed before a model runs. post_statements: The list of SQL statements that get executed before a model runs. on_virtual_update: The list of SQL statements to be executed after the virtual update. @@ -69,6 +70,7 @@ class ModelDefaultsConfig(BaseConfig): interval_unit: t.Optional[t.Union[str, IntervalUnit]] = None enabled: t.Optional[t.Union[str, bool]] = None formatting: t.Optional[t.Union[str, bool]] = None + batch_concurrency: t.Optional[int] = None pre_statements: t.Optional[t.List[t.Union[str, exp.Expression]]] = None post_statements: t.Optional[t.List[t.Union[str, exp.Expression]]] = None on_virtual_update: t.Optional[t.List[t.Union[str, exp.Expression]]] = None diff --git a/sqlmesh/core/model/kind.py b/sqlmesh/core/model/kind.py index cc4c6f0826..ad5197a73a 100644 --- a/sqlmesh/core/model/kind.py +++ b/sqlmesh/core/model/kind.py @@ -1105,6 +1105,18 @@ def create_model_kind(v: t.Any, dialect: str, defaults: t.Dict[str, t.Any]) -> M ): props[on_change_property] = defaults.get(on_change_property) + # only pass the batch_concurrency user default to models inheriting from _IncrementalBy + # that don't explicitly set it in the model definition, but ignore subclasses of _IncrementalBy + # that hardcode a specific batch_concurrency + if issubclass(kind_type, _IncrementalBy): + BATCH_CONCURRENCY: t.Final = "batch_concurrency" + if ( + props.get(BATCH_CONCURRENCY) is None + and defaults.get(BATCH_CONCURRENCY) is not None + and kind_type.all_field_infos()[BATCH_CONCURRENCY].default is None + ): + props[BATCH_CONCURRENCY] = defaults.get(BATCH_CONCURRENCY) + if kind_type == CustomKind: # load the custom materialization class and check if it uses a custom kind type from sqlmesh.core.snapshot.evaluator import get_custom_materialization_type diff --git a/tests/core/test_model.py b/tests/core/test_model.py index f1a9eeb0b9..f6fc448b79 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -7583,6 +7583,107 @@ def test_forward_only_on_destructive_change_config() -> None: assert context_model.on_destructive_change.is_allow +def test_batch_concurrency_config() -> None: + # No batch_concurrency default for incremental models + config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb")) + context = Context(config=config) + + expressions = d.parse( + """ + MODEL ( + name memory.db.table, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column c + ), + ); + SELECT a, b, c FROM source_table; + """ + ) + model = load_sql_based_model(expressions, defaults=config.model_defaults.dict()) + context.upsert_model(model) + context_model = context.get_model("memory.db.table") + assert context_model.batch_concurrency is None + + # batch_concurrency specified in model defaults applies to incremental models + config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5)) + context = Context(config=config) + + expressions = d.parse( + """ + MODEL ( + name memory.db.table, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column c + ), + ); + SELECT a, b, c FROM source_table; + """ + ) + model = load_sql_based_model(expressions, defaults=config.model_defaults.dict()) + context.upsert_model(model) + context_model = context.get_model("memory.db.table") + assert context_model.batch_concurrency == 5 + + # batch_concurrency specified in model definition overrides default + config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5)) + context = Context(config=config) + + expressions = d.parse( + """ + MODEL ( + name memory.db.table, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column c, + batch_concurrency 10 + ), + ); + SELECT a, b, c FROM source_table; + """ + ) + model = load_sql_based_model(expressions, defaults=config.model_defaults.dict()) + context.upsert_model(model) + context_model = context.get_model("memory.db.table") + assert context_model.batch_concurrency == 10 + + # batch_concurrency default does not apply to non-incremental models + config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5)) + context = Context(config=config) + + expressions = d.parse( + """ + MODEL ( + name memory.db.table, + kind FULL, + ); + SELECT a, b, c FROM source_table; + """ + ) + model = load_sql_based_model(expressions, defaults=config.model_defaults.dict()) + context.upsert_model(model) + context_model = context.get_model("memory.db.table") + assert context_model.batch_concurrency is None + + # batch_concurrency default does not apply to INCREMENTAL_BY_UNIQUE_KEY models + config = Config(model_defaults=ModelDefaultsConfig(dialect="duckdb", batch_concurrency=5)) + context = Context(config=config) + + expressions = d.parse( + """ + MODEL ( + name memory.db.table, + kind INCREMENTAL_BY_UNIQUE_KEY ( + unique_key a + ), + ); + SELECT a, b, c FROM source_table; + """ + ) + model = load_sql_based_model(expressions, defaults=config.model_defaults.dict()) + context.upsert_model(model) + context_model = context.get_model("memory.db.table") + assert context_model.batch_concurrency == 1 + + def test_model_meta_on_additive_change_property() -> None: """Test that ModelMeta has on_additive_change property that works like on_destructive_change.""" from sqlmesh.core.model.kind import IncrementalByTimeRangeKind, OnAdditiveChange