From 7a49bd3cfa10011d0430e1f9de4ce6ea37d19571 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 1 Jun 2025 06:41:27 +0000 Subject: [PATCH 1/3] Merge pull request #79369 from ilejn/ignore_error_distributed_ddl_queue Ignore parse error in system.distributed_ddl_queue conflict resolved --- .../System/StorageSystemDDLWorkerQueue.cpp | 30 +++- .../configs/remote_servers.xml | 1 + .../test_system_ddl_worker_queue/test.py | 128 +++++++++++++++--- 3 files changed, 133 insertions(+), 26 deletions(-) diff --git a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp index ae1a233ea81e..23819d0f8cd2 100644 --- a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp +++ b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp @@ -21,6 +21,11 @@ namespace fs = std::filesystem; namespace DB { +namespace ErrorCodes +{ + extern const int SYNTAX_ERROR; +} + enum class Status : uint8_t { INACTIVE, @@ -54,7 +59,7 @@ ColumnsDescription StorageSystemDDLWorkerQueue::getColumnsDescription() {"entry_version", std::make_shared(std::make_shared()), "Version of the entry."}, {"initiator_host", std::make_shared(std::make_shared()), "Host that initiated the DDL operation."}, {"initiator_port", std::make_shared(std::make_shared()), "Port used by the initiator."}, - {"cluster", std::make_shared(), "Cluster name."}, + {"cluster", std::make_shared(), "Cluster name, empty if not determined."}, {"query", std::make_shared(), "Query executed."}, {"settings", std::make_shared(std::make_shared(), std::make_shared()), "Settings used in the DDL operation."}, {"query_create_time", std::make_shared(), "Query created time."}, @@ -77,10 +82,25 @@ static String clusterNameFromDDLQuery(ContextPtr context, const DDLTask & task) String description = fmt::format("from {}", task.entry_path); ParserQuery parser_query(end, settings.allow_settings_after_format_in_insert); - ASTPtr query = parseQuery(parser_query, begin, end, description, - settings.max_query_size, - settings.max_parser_depth, - settings.max_parser_backtracks); + ASTPtr query; + + try + { + query = parseQuery(parser_query, begin, end, description, + settings.max_query_size, + settings.max_parser_depth, + settings.max_parser_backtracks); + } + catch (const Exception & e) + { + LOG_INFO(getLogger("StorageSystemDDLWorkerQueue"), "Failed to determine cluster"); + if (e.code() == ErrorCodes::SYNTAX_ERROR) + { + /// ignore parse error and present available information + return ""; + } + throw; + } String cluster_name; if (const auto * query_on_cluster = dynamic_cast(query.get())) diff --git a/tests/integration/test_system_ddl_worker_queue/configs/remote_servers.xml b/tests/integration/test_system_ddl_worker_queue/configs/remote_servers.xml index 791af83a2d6d..f6392caf5e51 100644 --- a/tests/integration/test_system_ddl_worker_queue/configs/remote_servers.xml +++ b/tests/integration/test_system_ddl_worker_queue/configs/remote_servers.xml @@ -25,4 +25,5 @@ + 1 diff --git a/tests/integration/test_system_ddl_worker_queue/test.py b/tests/integration/test_system_ddl_worker_queue/test.py index 4659e5b92e84..1bebf709a821 100644 --- a/tests/integration/test_system_ddl_worker_queue/test.py +++ b/tests/integration/test_system_ddl_worker_queue/test.py @@ -1,4 +1,5 @@ import pytest +import time from helpers.cluster import ClickHouseCluster @@ -25,46 +26,131 @@ def started_cluster(): try: cluster.start() - for i, node in enumerate([node1, node2]): - node.query("CREATE DATABASE testdb") - node.query( - """CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table1', '{}') ORDER BY id;""".format( - i - ) - ) - for i, node in enumerate([node3, node4]): - node.query("CREATE DATABASE testdb") - node.query( - """CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table2', '{}') ORDER BY id;""".format( - i - ) - ) yield cluster finally: cluster.shutdown() +def maintain_test_table(test_table): + tmark = time.time() # to guarantee ZK path uniqueness + + for i, node in enumerate([node1, node2]): + node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC") + node.query("DROP DATABASE IF EXISTS testdb") + + node.query("CREATE DATABASE testdb") + node.query( + f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}1-{tmark}', '{i}') ORDER BY id;" + ) + for i, node in enumerate([node3, node4]): + node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC") + node.query("DROP DATABASE IF EXISTS testdb") + + node.query("CREATE DATABASE testdb") + node.query( + f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}2-{tmark}', '{i}') ORDER BY id;" + ) + + def test_distributed_ddl_queue(started_cluster): + test_table = "test_table" + maintain_test_table(test_table) node1.query( - "INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)" + f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)" ) node3.query( - "INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)" + f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)" ) - node2.query("SYSTEM SYNC REPLICA testdb.test_table") - node4.query("SYSTEM SYNC REPLICA testdb.test_table") + node2.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") + node4.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") node1.query( - "ALTER TABLE testdb.test_table ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val", + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val", settings={"replication_alter_partitions_sync": "2"}, ) for node in nodes: - node.query("SYSTEM SYNC REPLICA testdb.test_table") - assert node.query("SELECT somecolumn FROM testdb.test_table LIMIT 1") == "0\n" + node.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") + assert ( + node.query(f"SELECT somecolumn FROM testdb.{test_table} LIMIT 1") == "0\n" + ) assert ( node.query( "SELECT If((SELECT count(*) FROM system.distributed_ddl_queue WHERE cluster='test_cluster' AND entry='query-0000000000') > 0, 'ok', 'fail')" ) == "ok\n" ) + + node1.query( + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somecolumn", + settings={"replication_alter_partitions_sync": "2"}, + ) + + +def test_distributed_ddl_rubbish(started_cluster): + test_table = "test_table_rubbish" + maintain_test_table(test_table) + node1.query( + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somenewcolumn UInt8 AFTER val", + settings={"replication_alter_partitions_sync": "2"}, + ) + + zk_content = node1.query( + "SELECT name, value, path FROM system.zookeeper WHERE path LIKE '/clickhouse/task_queue/ddl%' SETTINGS allow_unrestricted_reads_from_keeper=true", + parse=True, + ).to_dict("records") + + original_query = "" + new_query = "query-artificial-" + str(time.monotonic_ns()) + + # Copy information about query (one that added 'somenewcolumn') with new query ID + # and broken query text (TABLE => TUBLE) + for row in zk_content: + if row["value"].find("somenewcolumn") >= 0: + original_query = row["name"] + break + + rows_to_insert = [] + + for row in zk_content: + if row["name"] == original_query: + rows_to_insert.append( + { + "name": new_query, + "path": row["path"], + "value": row["value"].replace("TABLE", "TUBLE"), + } + ) + continue + pos = row["path"].find(original_query) + if pos >= 0: + rows_to_insert.append( + { + "name": row["name"], + "path": row["path"].replace(original_query, new_query), + "value": row["value"], + } + ) + + # Ingest it to ZK + for row in rows_to_insert: + node1.query( + "insert into system.zookeeper (name, path, value) values ('{}', '{}', '{}')".format( + f'{row["name"]}', f'{row["path"]}', f'{row["value"]}' + ) + ) + + # Ensure that data is visible via system.distributed_ddl_queue + assert ( + int( + node1.query( + f"SELECT count(1) FROM system.distributed_ddl_queue WHERE entry='{new_query}' AND cluster=''" + ) + ) + == 4 + ) + + node1.query( + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somenewcolumn", + settings={"replication_alter_partitions_sync": "2"}, + ) From 34e9bcf53892155467d296433830e665004e3df0 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Thu, 12 Jun 2025 13:24:26 +0000 Subject: [PATCH 2/3] Adapt test_system_ddl_worker_queue/test.py::test_distributed_ddl_rubbish to 24.8 --- .../test_system_ddl_worker_queue/test.py | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_system_ddl_worker_queue/test.py b/tests/integration/test_system_ddl_worker_queue/test.py index 1bebf709a821..a743dbe3f396 100644 --- a/tests/integration/test_system_ddl_worker_queue/test.py +++ b/tests/integration/test_system_ddl_worker_queue/test.py @@ -1,5 +1,8 @@ import pytest import time +from io import StringIO +import csv +import logging from helpers.cluster import ClickHouseCluster @@ -95,10 +98,13 @@ def test_distributed_ddl_rubbish(started_cluster): settings={"replication_alter_partitions_sync": "2"}, ) - zk_content = node1.query( - "SELECT name, value, path FROM system.zookeeper WHERE path LIKE '/clickhouse/task_queue/ddl%' SETTINGS allow_unrestricted_reads_from_keeper=true", - parse=True, - ).to_dict("records") + zk_content_raw = node1.query( + "SELECT name, value, path FROM system.zookeeper WHERE path LIKE '/clickhouse/task_queue/ddl%' SETTINGS allow_unrestricted_reads_from_keeper=true FORMAT TabSeparatedWithNames", + # parse=True, + ) # .to_dict("records") + + dict_reader = csv.DictReader(StringIO(zk_content_raw), delimiter='\t') + zk_content = [row for row in dict_reader] original_query = "" new_query = "query-artificial-" + str(time.monotonic_ns()) @@ -150,7 +156,7 @@ def test_distributed_ddl_rubbish(started_cluster): == 4 ) - node1.query( - f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somenewcolumn", - settings={"replication_alter_partitions_sync": "2"}, - ) + # node1.query( + # f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somenewcolumn", + # settings={"replication_alter_partitions_sync": "2"}, + # ) From 7943791bbb540719ad8440bb0f9148feee30cf57 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov Date: Wed, 18 Jun 2025 09:20:26 +0300 Subject: [PATCH 3/3] Poke CI once more