Conversation
| def test_insert_overwrite_by_time_partition_replace_where_pandas( | ||
| make_mocked_engine_adapter: t.Callable, mocker: MockerFixture, make_temp_table_name: t.Callable | ||
| ): | ||
| mocker.patch( | ||
| "sqlmesh.core.engine_adapter.mssql.MSSQLEngineAdapter.table_exists", | ||
| return_value=False, | ||
| ) | ||
|
|
||
| adapter = make_mocked_engine_adapter(MSSQLEngineAdapter) | ||
| adapter.INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.REPLACE_WHERE | ||
|
|
||
| temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.EngineAdapter._get_temp_table") | ||
| table_name = "test_table" | ||
| temp_table_id = "abcdefgh" | ||
| temp_table_mock.return_value = make_temp_table_name(table_name, temp_table_id) | ||
|
|
||
| df = pd.DataFrame({"a": [1, 2], "ds": ["2022-01-01", "2022-01-02"]}) | ||
| adapter.insert_overwrite_by_time_partition( | ||
| table_name, | ||
| df, | ||
| start="2022-01-01", | ||
| end="2022-01-02", | ||
| time_formatter=lambda x, _: exp.Literal.string(to_ds(x)), | ||
| time_column="ds", | ||
| target_columns_to_types={ | ||
| "a": exp.DataType.build("INT"), | ||
| "ds": exp.DataType.build("STRING"), | ||
| }, | ||
| ) | ||
| adapter._connection_pool.get().bulk_copy.assert_called_with( | ||
| f"__temp_test_table_{temp_table_id}", [(1, "2022-01-01"), (2, "2022-01-02")] | ||
| ) | ||
|
|
||
| assert to_sql_calls(adapter) == [ | ||
| f"""IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '__temp_test_table_{temp_table_id}') EXEC('CREATE TABLE [__temp_test_table_{temp_table_id}] ([a] INTEGER, [ds] VARCHAR(MAX))');""", | ||
| f"""MERGE INTO [test_table] AS [__MERGE_TARGET__] USING (SELECT [a] AS [a], [ds] AS [ds] FROM (SELECT CAST([a] AS INTEGER) AS [a], CAST([ds] AS VARCHAR(MAX)) AS [ds] FROM [__temp_test_table_{temp_table_id}]) AS [_subquery] WHERE [ds] BETWEEN '2022-01-01' AND '2022-01-02') AS [__MERGE_SOURCE__] ON (1 = 0) WHEN NOT MATCHED BY SOURCE AND [ds] BETWEEN '2022-01-01' AND '2022-01-02' THEN DELETE WHEN NOT MATCHED THEN INSERT ([a], [ds]) VALUES ([a], [ds]);""", | ||
| f"DROP TABLE IF EXISTS [__temp_test_table_{temp_table_id}];", | ||
| ] |
There was a problem hiding this comment.
This test didn't make sense. It was changing the MSSQL engine adapter to use REPLACE_WHERE and then showing that it did nothing. It would now do something but that is expected. I think it will likely a copy/paste mistake from other tests without understanding the intent.
f3c9410 to
9fd4e8e
Compare
erindru
left a comment
There was a problem hiding this comment.
I guess it was implemented separately to begin with because it's pretty much only MSSQL that supports WHEN NOT MATCHED [BY SOURCE] that makes this actually work in practice.
But consolidating it does simplify things somewhat
|
Hey @eakmanrq ! I believe this change is causing my plan to attempt creation of tables twice - which isn't a huge deal, but it means that my macros that create indexes on the tables fire off twice and cause failures (since .. that index already exists on the table!) Not entirely sure what causes it, but it seems in my project that all queries without dependencies to partitioned queries are run first, and then a second "executing model batches" runs with the partition models and downstream dependencies. If I recall correctly this has been the behavior for a good while. However (starting with 0.219.0) some tables are attempted created again in this second batch. This causes my macro to fire off (evaluation stage "create"), even though the SQL didn't execute due to the So I get log outputs like this: Log row 245: Log row 827: |
Deprecating
InsertOverwriteWithMergeMixinand instead rely solely onINSERT_OVERWRITE_STRATEGY. Basically before we had two ways of doing the same thing so this consolidates into a single way.