From bc7f70e2e91ceb6ffb7e0d76e8fc6c5757ba040f Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Sat, 13 Sep 2025 10:28:09 -0300 Subject: [PATCH 01/14] Merge pull request #1009 from Altinity/simple_export_part simple export part --- src/Access/Common/AccessType.h | 1 + src/CMakeLists.txt | 1 + src/Core/ServerSettings.cpp | 1 + src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 9 + src/Databases/DatabaseReplicated.cpp | 3 +- src/Disks/ObjectStorages/IObjectStorage.h | 2 + src/Interpreters/Context.cpp | 11 + src/Interpreters/Context.h | 1 + src/Interpreters/DDLWorker.cpp | 3 +- src/Interpreters/InterpreterAlterQuery.cpp | 6 + src/Interpreters/PartLog.cpp | 4 +- src/Interpreters/PartLog.h | 1 + src/Interpreters/executeDDLQueryOnCluster.cpp | 2 + src/Parsers/ASTAlterQuery.cpp | 28 ++ src/Parsers/ASTAlterQuery.h | 3 + src/Parsers/CommonParsers.h | 1 + src/Parsers/ParserAlterQuery.cpp | 18 ++ src/Storages/IPartitionStrategy.cpp | 48 ---- src/Storages/IPartitionStrategy.h | 17 +- src/Storages/IStorage.h | 25 ++ src/Storages/MergeTree/MergeTreeData.cpp | 259 ++++++++++++++++-- src/Storages/MergeTree/MergeTreeData.h | 17 +- .../MergeTree/MergeTreeExportManifest.h | 35 +++ .../MergeTree/MergeTreeExportStatus.h | 20 ++ src/Storages/MergeTree/MergeTreePartition.cpp | 16 ++ src/Storages/MergeTree/MergeTreePartition.h | 2 + .../MergeTree/MergeTreeSequentialSource.cpp | 3 + .../MergeTree/MergeTreeSequentialSource.h | 1 + .../StorageObjectStorageImporterSink.cpp | 65 +++++ .../StorageObjectStorageImporterSink.h | 54 ++++ .../ObjectStorageFilePathGenerator.h | 79 ++++++ .../ObjectStorage/StorageObjectStorage.cpp | 51 +++- .../ObjectStorage/StorageObjectStorage.h | 10 + .../StorageObjectStorageCluster.cpp | 25 ++ .../StorageObjectStorageCluster.h | 8 + .../StorageObjectStorageConfiguration.cpp | 26 +- .../StorageObjectStorageConfiguration.h | 5 + .../StorageObjectStorageSink.cpp | 5 +- .../ObjectStorage/StorageObjectStorageSink.h | 4 +- src/Storages/PartitionCommands.cpp | 12 + src/Storages/PartitionCommands.h | 1 + src/Storages/PartitionedSink.cpp | 4 +- src/Storages/PartitionedSink.h | 12 +- src/Storages/StorageFile.cpp | 20 +- src/Storages/StorageMergeTree.cpp | 9 +- src/Storages/StorageMergeTree.h | 2 - src/Storages/StorageReplicatedMergeTree.cpp | 9 +- src/Storages/StorageReplicatedMergeTree.h | 2 - src/Storages/StorageURL.cpp | 12 +- src/Storages/System/StorageSystemExports.cpp | 118 ++++++++ src/Storages/System/StorageSystemExports.h | 25 ++ src/Storages/System/attachSystemTables.cpp | 1 + .../01271_show_privileges.reference | 1 + ...erge_tree_part_to_object_storage.reference | 16 ++ ...xport_merge_tree_part_to_object_storage.sh | 38 +++ ...ee_part_to_object_storage_simple.reference | 0 ...rge_tree_part_to_object_storage_simple.sql | 22 ++ ...erge_tree_part_to_object_storage.reference | 16 ++ ...cated_merge_tree_part_to_object_storage.sh | 43 +++ ...ee_part_to_object_storage_simple.reference | 0 ...rge_tree_part_to_object_storage_simple.sql | 22 ++ 62 files changed, 1130 insertions(+), 128 deletions(-) create mode 100644 src/Storages/MergeTree/MergeTreeExportManifest.h create mode 100644 src/Storages/MergeTree/MergeTreeExportStatus.h create mode 100644 src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp create mode 100644 src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h create mode 100644 src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h create mode 100644 src/Storages/System/StorageSystemExports.cpp create mode 100644 src/Storages/System/StorageSystemExports.h create mode 100644 tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference create mode 100755 tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh create mode 100644 tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.reference create mode 100644 tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql create mode 100644 tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.reference create mode 100755 tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh create mode 100644 tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.reference create mode 100644 tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index 13a9911c702e..9bc46ab31a47 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -210,6 +210,7 @@ enum class AccessType : uint8_t enabled implicitly by the grant ALTER_TABLE */\ M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING, RESET SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\ M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \ + M(ALTER_EXPORT_PART, "ALTER EXPORT PART, EXPORT PART", TABLE, ALTER_TABLE) \ M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \ M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \ M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2dcf07466941..411942d62a60 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -130,6 +130,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/Azure) add_headers_and_sources(dbms Storages/ObjectStorage/S3) add_headers_and_sources(dbms Storages/ObjectStorage/HDFS) add_headers_and_sources(dbms Storages/ObjectStorage/Local) +add_headers_and_sources(dbms Storages/ObjectStorage/MergeTree) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/DeltaLake) diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index 004f8a16098c..8c5c18cdc3cd 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -112,6 +112,7 @@ namespace DB DECLARE(UInt64, max_unexpected_parts_loading_thread_pool_size, 8, R"(The number of threads to load inactive set of data parts (Unexpected ones) at startup.)", 0) \ DECLARE(UInt64, max_parts_cleaning_thread_pool_size, 128, R"(The number of threads for concurrent removal of inactive data parts.)", 0) \ DECLARE(UInt64, max_mutations_bandwidth_for_server, 0, R"(The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.)", 0) \ + DECLARE(UInt64, max_exports_bandwidth_for_server, 0, R"(The maximum read speed of all exports on server in bytes per second. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_merges_bandwidth_for_server, 0, R"(The maximum read speed of all merges on server in bytes per second. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.)", 0) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 497760a1325d..f3d0228bbf3f 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7030,6 +7030,9 @@ Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distri DECLARE_WITH_ALIAS(Bool, allow_experimental_time_series_aggregate_functions, false, R"( Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation. )", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \ + DECLARE_WITH_ALIAS(Bool, allow_experimental_export_merge_tree_part, false, R"( +Experimental export merge tree part. +)", EXPERIMENTAL, allow_experimental_export_merge_tree_part) \ \ DECLARE(String, promql_database, "", R"( Specifies the database name used by the 'promql' dialect. Empty string means the current database. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 1ba731c37bbf..0fd9c302500d 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -132,6 +132,15 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"}, {"allow_experimental_insert_into_iceberg", false, false, "New setting."}, /// RELEASE CLOSED + {"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"}, + {"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"}, + {"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"}, + {"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"}, + {"lock_object_storage_task_distribution_ms", 0, 0, "New setting."}, + {"object_storage_cluster", "", "", "New setting"}, + {"object_storage_max_nodes", 0, 0, "New setting"}, + {"object_storage_remote_initiator", false, false, "New setting."}, + {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 3ac52499ed66..c089390cfb1a 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -2245,7 +2245,8 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context, if (const auto * alter = query_ptr->as()) { if (alter->isAttachAlter() || alter->isFetchAlter() || alter->isDropPartitionAlter() - || is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot()) + || is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot() + || alter->isExportPartAlter()) return false; if (has_many_shards() || !is_replicated_table(query_ptr)) diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index 1f3a4278f135..9362fb02d517 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -129,6 +129,8 @@ struct RelativePathWithMetadata virtual ~RelativePathWithMetadata() = default; virtual std::string getFileName() const { return std::filesystem::path(relative_path).filename(); } + virtual std::string getFileNameWithoutExtension() const { return std::filesystem::path(relative_path).stem(); } + virtual std::string getPath() const { return relative_path; } virtual bool isArchive() const { return false; } virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 91a63763a7c4..9934eb747f8e 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -324,6 +324,7 @@ namespace ServerSetting extern const ServerSettingsUInt64 max_local_write_bandwidth_for_server; extern const ServerSettingsUInt64 max_merges_bandwidth_for_server; extern const ServerSettingsUInt64 max_mutations_bandwidth_for_server; + extern const ServerSettingsUInt64 max_exports_bandwidth_for_server; extern const ServerSettingsUInt64 max_remote_read_network_bandwidth_for_server; extern const ServerSettingsUInt64 max_remote_write_network_bandwidth_for_server; extern const ServerSettingsUInt64 max_replicated_fetches_network_bandwidth_for_server; @@ -544,6 +545,8 @@ struct ContextSharedPart : boost::noncopyable mutable ThrottlerPtr mutations_throttler; /// A server-wide throttler for mutations mutable ThrottlerPtr merges_throttler; /// A server-wide throttler for merges + mutable ThrottlerPtr exports_throttler; /// A server-wide throttler for exports + MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker TSA_GUARDED_BY(mutex); /// Process ddl commands from zk. LoadTaskPtr ddl_worker_startup_task; /// To postpone `ddl_worker->startup()` after all tables startup @@ -1051,6 +1054,9 @@ struct ContextSharedPart : boost::noncopyable if (auto bandwidth = server_settings[ServerSetting::max_merges_bandwidth_for_server]) merges_throttler = std::make_shared(bandwidth, ProfileEvents::MergesThrottlerBytes, ProfileEvents::MergesThrottlerSleepMicroseconds); + + if (auto bandwidth = server_settings[ServerSetting::max_exports_bandwidth_for_server]) + exports_throttler = std::make_shared(bandwidth, ProfileEvents::ExportsThrottlerBytes, ProfileEvents::ExportsThrottlerSleepMicroseconds); } }; @@ -4148,6 +4154,11 @@ ThrottlerPtr Context::getMergesThrottler() const return shared->merges_throttler; } +ThrottlerPtr Context::getExportsThrottler() const +{ + return shared->exports_throttler; +} + void Context::reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const { if (read_bandwidth) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index a7332ae287dc..e06da1fa4dd3 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1659,6 +1659,7 @@ class Context: public ContextData, public std::enable_shared_from_this ThrottlerPtr getMutationsThrottler() const; ThrottlerPtr getMergesThrottler() const; + ThrottlerPtr getExportsThrottler() const; void reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const; void reloadLocalThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const; diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index adc8c01a0294..28182a3c772c 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -752,7 +752,8 @@ bool DDLWorker::taskShouldBeExecutedOnLeader(const ASTPtr & ast_ddl, const Stora alter->isFreezeAlter() || alter->isUnlockSnapshot() || alter->isMovePartitionToDiskOrVolumeAlter() || - alter->isCommentAlter()) + alter->isCommentAlter() || + alter->isExportPartAlter()) return false; } diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 841e9b768d44..988df44e7049 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -539,6 +539,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS required_access.emplace_back(AccessType::ALTER_DELETE | AccessType::INSERT, database, table); break; } + case ASTAlterCommand::EXPORT_PART: + { + required_access.emplace_back(AccessType::ALTER_EXPORT_PART, database, table); + required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table); + break; + } case ASTAlterCommand::FETCH_PARTITION: { required_access.emplace_back(AccessType::ALTER_FETCH_PARTITION, database, table); diff --git a/src/Interpreters/PartLog.cpp b/src/Interpreters/PartLog.cpp index 02c6f6e573b0..aca3b4cf6870 100644 --- a/src/Interpreters/PartLog.cpp +++ b/src/Interpreters/PartLog.cpp @@ -69,6 +69,7 @@ ColumnsDescription PartLogElement::getColumnsDescription() {"MovePart", static_cast(MOVE_PART)}, {"MergePartsStart", static_cast(MERGE_PARTS_START)}, {"MutatePartStart", static_cast(MUTATE_PART_START)}, + {"ExportPart", static_cast(EXPORT_PART)}, } ); @@ -109,7 +110,8 @@ ColumnsDescription PartLogElement::getColumnsDescription() "RemovePart — Removing or detaching a data part using [DETACH PARTITION](/sql-reference/statements/alter/partition#detach-partitionpart)." "MutatePartStart — Mutating of a data part has started, " "MutatePart — Mutating of a data part has finished, " - "MovePart — Moving the data part from the one disk to another one."}, + "MovePart — Moving the data part from the one disk to another one." + "ExportPart — Exporting the data part from a MergeTree table into a target table that represents external storage (e.g., object storage or a data lake).."}, {"merge_reason", std::move(merge_reason_datatype), "The reason for the event with type MERGE_PARTS. Can have one of the following values: " "NotAMerge — The current event has the type other than MERGE_PARTS, " diff --git a/src/Interpreters/PartLog.h b/src/Interpreters/PartLog.h index 44d2fb413c5f..4f58069dae55 100644 --- a/src/Interpreters/PartLog.h +++ b/src/Interpreters/PartLog.h @@ -30,6 +30,7 @@ struct PartLogElement MOVE_PART = 6, MERGE_PARTS_START = 7, MUTATE_PART_START = 8, + EXPORT_PART = 9, }; /// Copy of MergeAlgorithm since values are written to disk. diff --git a/src/Interpreters/executeDDLQueryOnCluster.cpp b/src/Interpreters/executeDDLQueryOnCluster.cpp index 46ef1aaafee3..eb2315253a0f 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.cpp +++ b/src/Interpreters/executeDDLQueryOnCluster.cpp @@ -53,6 +53,8 @@ bool isSupportedAlterTypeForOnClusterDDLQuery(int type) ASTAlterCommand::ATTACH_PARTITION, /// Usually followed by ATTACH PARTITION ASTAlterCommand::FETCH_PARTITION, + /// Data operation that should be executed locally on each replica + ASTAlterCommand::EXPORT_PART, /// Logical error ASTAlterCommand::NO_TYPE, }; diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index 32f2156b5cde..7006e1bdb8f7 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -358,6 +358,29 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett ostr << quoteString(move_destination_name); } } + else if (type == ASTAlterCommand::EXPORT_PART) + { + ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << "PART" + << (settings.hilite ? hilite_none : ""); + partition->format(ostr, settings, state, frame); + ostr << " TO "; + switch (move_destination_type) + { + case DataDestinationType::TABLE: + ostr << "TABLE "; + if (!to_database.empty()) + { + ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database) + << (settings.hilite ? hilite_none : "") << "."; + } + ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table) + << (settings.hilite ? hilite_none : ""); + return; + default: + break; + } + + } else if (type == ASTAlterCommand::REPLACE_PARTITION) { ostr << (replace ? "REPLACE" : "ATTACH") << " PARTITION " @@ -627,6 +650,11 @@ bool ASTAlterQuery::isMovePartitionToDiskOrVolumeAlter() const return false; } +bool ASTAlterQuery::isExportPartAlter() const +{ + return isOneCommandTypeOnly(ASTAlterCommand::EXPORT_PART); +} + /** Get the text that identifies this element. */ String ASTAlterQuery::getID(char delim) const diff --git a/src/Parsers/ASTAlterQuery.h b/src/Parsers/ASTAlterQuery.h index 3867a86cf797..d8d502cb87c6 100644 --- a/src/Parsers/ASTAlterQuery.h +++ b/src/Parsers/ASTAlterQuery.h @@ -71,6 +71,7 @@ class ASTAlterCommand : public IAST FREEZE_ALL, UNFREEZE_PARTITION, UNFREEZE_ALL, + EXPORT_PART, DELETE, UPDATE, @@ -263,6 +264,8 @@ class ASTAlterQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCl bool isMovePartitionToDiskOrVolumeAlter() const; + bool isExportPartAlter() const; + bool isCommentAlter() const; String getID(char) const override; diff --git a/src/Parsers/CommonParsers.h b/src/Parsers/CommonParsers.h index 057aad6fffea..58694bde8984 100644 --- a/src/Parsers/CommonParsers.h +++ b/src/Parsers/CommonParsers.h @@ -332,6 +332,7 @@ namespace DB MR_MACROS(MONTHS, "MONTHS") \ MR_MACROS(MOVE_PART, "MOVE PART") \ MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \ + MR_MACROS(EXPORT_PART, "EXPORT PART") \ MR_MACROS(MOVE, "MOVE") \ MR_MACROS(MS, "MS") \ MR_MACROS(MUTATION, "MUTATION") \ diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index 4bb76c0d2e4b..775d495492cf 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -82,6 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION); ParserKeyword s_move_partition(Keyword::MOVE_PARTITION); ParserKeyword s_move_part(Keyword::MOVE_PART); + ParserKeyword s_export_part(Keyword::EXPORT_PART); ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION); ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART); ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION); @@ -535,6 +536,23 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected command->move_destination_name = ast_space_name->as().value.safeGet(); } + else if (s_export_part.ignore(pos, expected)) + { + if (!parser_string_and_substituion.parse(pos, command_partition, expected)) + return false; + + command->type = ASTAlterCommand::EXPORT_PART; + command->part = true; + + if (!s_to_table.ignore(pos, expected)) + { + return false; + } + + if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table)) + return false; + command->move_destination_type = DataDestinationType::TABLE; + } else if (s_move_partition.ignore(pos, expected)) { if (!parser_partition.parse(pos, command_partition, expected)) diff --git a/src/Storages/IPartitionStrategy.cpp b/src/Storages/IPartitionStrategy.cpp index 0be6d30f4e7c..0e2f897fb617 100644 --- a/src/Storages/IPartitionStrategy.cpp +++ b/src/Storages/IPartitionStrategy.cpp @@ -264,19 +264,6 @@ ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) return block_with_partition_by_expr.getByName(actions_with_column_name.column_name).column; } -std::string WildcardPartitionStrategy::getPathForRead( - const std::string & prefix) -{ - return prefix; -} - -std::string WildcardPartitionStrategy::getPathForWrite( - const std::string & prefix, - const std::string & partition_key) -{ - return PartitionedSink::replaceWildcards(prefix, partition_key); -} - HiveStylePartitionStrategy::HiveStylePartitionStrategy( KeyDescription partition_key_description_, const Block & sample_block_, @@ -296,41 +283,6 @@ HiveStylePartitionStrategy::HiveStylePartitionStrategy( block_without_partition_columns = buildBlockWithoutPartitionColumns(sample_block, partition_columns_name_set); } -std::string HiveStylePartitionStrategy::getPathForRead(const std::string & prefix) -{ - return prefix + "**." + Poco::toLower(file_format); -} - -std::string HiveStylePartitionStrategy::getPathForWrite( - const std::string & prefix, - const std::string & partition_key) -{ - std::string path; - - if (!prefix.empty()) - { - path += prefix; - if (path.back() != '/') - { - path += '/'; - } - } - - /// Not adding '/' because buildExpressionHive() always adds a trailing '/' - path += partition_key; - - /* - * File extension is toLower(format) - * This isn't ideal, but I guess multiple formats can be specified and introduced. - * So I think it is simpler to keep it this way. - * - * Or perhaps implement something like `IInputFormat::getFileExtension()` - */ - path += std::to_string(generateSnowflakeID()) + "." + Poco::toLower(file_format); - - return path; -} - ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) { Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); diff --git a/src/Storages/IPartitionStrategy.h b/src/Storages/IPartitionStrategy.h index bc90d7f03461..606122b4ae71 100644 --- a/src/Storages/IPartitionStrategy.h +++ b/src/Storages/IPartitionStrategy.h @@ -29,8 +29,12 @@ struct IPartitionStrategy virtual ColumnPtr computePartitionKey(const Chunk & chunk) = 0; - virtual std::string getPathForRead(const std::string & prefix) = 0; - virtual std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) = 0; + ColumnPtr computePartitionKey(Block & block) const + { + actions_with_column_name.actions->execute(block); + + return block.getByName(actions_with_column_name.column_name).column; + } virtual ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) { @@ -53,6 +57,7 @@ struct IPartitionStrategy const KeyDescription partition_key_description; const Block sample_block; ContextPtr context; + PartitionExpressionActionsAndColumnName actions_with_column_name; }; /* @@ -89,11 +94,6 @@ struct WildcardPartitionStrategy : IPartitionStrategy WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_); ColumnPtr computePartitionKey(const Chunk & chunk) override; - std::string getPathForRead(const std::string & prefix) override; - std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) override; - -private: - PartitionExpressionActionsAndColumnName actions_with_column_name; }; /* @@ -111,8 +111,6 @@ struct HiveStylePartitionStrategy : IPartitionStrategy bool partition_columns_in_data_file_); ColumnPtr computePartitionKey(const Chunk & chunk) override; - std::string getPathForRead(const std::string & prefix) override; - std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) override; ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) override; Block getFormatHeader() override; @@ -121,7 +119,6 @@ struct HiveStylePartitionStrategy : IPartitionStrategy const std::string file_format; const bool partition_columns_in_data_file; std::unordered_set partition_columns_name_set; - PartitionExpressionActionsAndColumnName actions_with_column_name; Block block_without_partition_columns; }; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 0fe6668ff8a2..232b6e46309e 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -450,6 +450,31 @@ class IStorage : public std::enable_shared_from_this, public TypePromo ContextPtr /*context*/, bool /*async_insert*/); + virtual bool supportsImport() const + { + return false; + } + + struct ImportStats + { + ExecutionStatus status; + std::size_t elapsed_ns = 0; + std::size_t bytes_on_disk = 0; + std::size_t read_rows = 0; + std::size_t read_bytes = 0; + std::string file_path = ""; + }; + + virtual SinkToStoragePtr import( + const std::string & /* file_name */, + Block & /* block_with_partition_values */, + ContextPtr /* context */, + std::function /* stats_log */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName()); + } + + /** Writes the data to a table in distributed manner. * It is supposed that implementation looks into SELECT part of the query and executes distributed * INSERT SELECT if it is possible with current storage as a receiver and query SELECT part as a producer. diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a0b64694f9de..5c55cfcde8d4 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -23,6 +23,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -199,6 +203,8 @@ namespace Setting extern const SettingsUInt64 min_insert_block_size_rows; extern const SettingsUInt64 min_insert_block_size_bytes; extern const SettingsBool apply_patch_parts; + extern const SettingsBool allow_experimental_export_merge_tree_part; + extern const SettingsUInt64 min_bytes_to_use_direct_io; } namespace MergeTreeSetting @@ -316,6 +322,7 @@ namespace ErrorCodes extern const int CANNOT_FORGET_PARTITION; extern const int DATA_TYPE_CANNOT_BE_USED_IN_KEY; extern const int TOO_LARGE_LIGHTWEIGHT_UPDATES; + extern const int UNKNOWN_TABLE; } static void checkSuspiciousIndices(const ASTFunction * index_function) @@ -4490,8 +4497,6 @@ void MergeTreeData::changeSettings( { if (new_settings) { - bool has_storage_policy_changed = false; - const auto & new_changes = new_settings->as().changes; StoragePolicyPtr new_storage_policy = nullptr; @@ -4530,8 +4535,6 @@ void MergeTreeData::changeSettings( disk->createDirectories(fs::path(relative_data_path) / DETACHED_DIR_NAME); } /// FIXME how would that be done while reloading configuration??? - - has_storage_policy_changed = true; } } } @@ -4548,9 +4551,6 @@ void MergeTreeData::changeSettings( StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); new_metadata.setSettingsChanges(new_settings); setInMemoryMetadata(new_metadata); - - if (has_storage_policy_changed) - startBackgroundMovesIfNeeded(); } } @@ -6189,6 +6189,170 @@ void MergeTreeData::movePartitionToTable(const PartitionCommand & command, Conte movePartitionToTable(dest_storage, command.partition, query_context); } +void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextPtr query_context) +{ + if (!query_context->getSettingsRef()[Setting::allow_experimental_export_merge_tree_part]) + { + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, + "Exporting merge tree part is experimental. Set `allow_experimental_export_merge_tree_part` to enable it"); + } + + String dest_database = query_context->resolveDatabase(command.to_database); + auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context); + + if (dest_storage->getStorageID() == this->getStorageID()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Exporting to the same table is not allowed"); + } + + if (!dest_storage->supportsImport()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Destination storage {} does not support MergeTree parts or uses unsupported partitioning", dest_storage->getName()); + + auto query_to_string = [] (const ASTPtr & ast) + { + return ast ? ast->formatWithSecretsOneLine() : ""; + }; + + auto src_snapshot = getInMemoryMetadataPtr(); + auto destination_snapshot = dest_storage->getInMemoryMetadataPtr(); + + if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical())) + throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure"); + + if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST())) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key"); + + auto part_name = command.partition->as().value.safeGet(); + + auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}); + + if (!part) + throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No such data part '{}' to export in table '{}'", + part_name, getStorageID().getFullTableName()); + + { + std::lock_guard lock(export_manifests_mutex); + + if (!export_manifests.emplace(dest_storage->getStorageID(), part).second) + { + throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'", + part_name, dest_storage->getStorageID().getFullTableName()); + } + } + + background_moves_assignee.trigger(); +} + +void MergeTreeData::exportPartToTableImpl( + const MergeTreeExportManifest & manifest, + ContextPtr local_context) +{ + std::function part_log_wrapper = [this, manifest](ImportStats stats) { + const auto & data_part = manifest.data_part; + + writePartLog( + PartLogElement::Type::EXPORT_PART, + stats.status, + stats.elapsed_ns, + data_part->name, + data_part, + {data_part}, + nullptr, + nullptr); + + std::lock_guard inner_lock(export_manifests_mutex); + + export_manifests.erase(manifest); + }; + + auto metadata_snapshot = getInMemoryMetadataPtr(); + Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); + StorageSnapshotPtr storage_snapshot = getStorageSnapshot(metadata_snapshot, local_context); + + MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export; + + NamesAndTypesList partition_columns; + if (metadata_snapshot->hasPartitionKey()) + { + const auto & partition_key = metadata_snapshot->getPartitionKey(); + if (!partition_key.column_names.empty()) + partition_columns = partition_key.expression->getRequiredColumnsWithTypes(); + } + + auto block_with_partition_values = manifest.data_part->partition.getBlockWithPartitionValues(partition_columns); + + auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, getContext()); + if (!destination_storage) + { + std::lock_guard inner_lock(export_manifests_mutex); + + const auto destination_storage_id_name = manifest.destination_storage_id.getNameForLogs(); + export_manifests.erase(manifest); + throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name); + } + + auto sink = destination_storage->import( + manifest.data_part->name, + block_with_partition_values, + local_context, + part_log_wrapper); + + /// Most likely the file has already been imported, so we can just return + if (!sink) + { + std::lock_guard inner_lock(export_manifests_mutex); + + export_manifests.erase(manifest); + return; + } + + bool apply_deleted_mask = true; + bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk(); + bool prefetch = false; + + MergeTreeData::IMutationsSnapshot::Params params + { + .metadata_version = metadata_snapshot->getMetadataVersion(), + .min_part_metadata_version = manifest.data_part->getMetadataVersion(), + }; + + auto mutations_snapshot = getMutationsSnapshot(params); + + auto alter_conversions = MergeTreeData::getAlterConversionsForPart( + manifest.data_part, + mutations_snapshot, + local_context); + + QueryPlan plan_for_part; + + createReadFromPartStep( + read_type, + plan_for_part, + *this, + storage_snapshot, + RangesInDataPart(manifest.data_part), + alter_conversions, + nullptr, + columns_to_read, + nullptr, + apply_deleted_mask, + std::nullopt, + read_with_direct_io, + prefetch, + local_context, + getLogger("ExportPartition")); + + QueryPlanOptimizationSettings optimization_settings(local_context); + auto pipeline_settings = BuildQueryPipelineSettings(local_context); + auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + + pipeline.complete(sink); + + CompletedPipelineExecutor exec(pipeline); + exec.execute(); +} + void MergeTreeData::movePartitionToShard(const ASTPtr & /*partition*/, bool /*move_part*/, const String & /*to*/, ContextPtr /*query_context*/) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MOVE PARTITION TO SHARD is not supported by storage {}", getName()); @@ -6240,6 +6404,11 @@ Pipe MergeTreeData::alterPartition( } } break; + case PartitionCommand::EXPORT_PART: + { + exportPartToTable(command, query_context); + break; + } case PartitionCommand::DROP_DETACHED_PARTITION: dropDetached(command.partition, command.part, query_context); @@ -8571,6 +8740,32 @@ std::pair MergeTreeData::cloneAn return std::make_pair(dst_data_part, std::move(temporary_directory_lock)); } +std::vector MergeTreeData::getExportsStatus() const +{ + std::lock_guard lock(export_manifests_mutex); + std::vector result; + + auto source_database = getStorageID().database_name; + auto source_table = getStorageID().table_name; + + for (const auto & manifest : export_manifests) + { + MergeTreeExportStatus status; + + status.source_database = source_database; + status.source_table = source_table; + status.destination_database = manifest.destination_storage_id.database_name; + status.destination_table = manifest.destination_storage_id.table_name; + status.create_time = manifest.create_time; + status.part_name = manifest.data_part->name; + + result.emplace_back(std::move(status)); + } + + return result; +} + + bool MergeTreeData::canUseAdaptiveGranularity() const { const auto settings = getSettings(); @@ -8957,21 +9152,43 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger() bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee) { - if (parts_mover.moves_blocker.isCancelled()) - return false; + if (!parts_mover.moves_blocker.isCancelled()) + { + auto moving_tagger = selectPartsForMove(); + if (!moving_tagger->parts_to_move.empty()) + { + assignee.scheduleMoveTask(std::make_shared( + [this, moving_tagger] () mutable + { + ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings(); + WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); + return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; + }, moves_assignee_trigger, getStorageID())); + return true; + } + } - auto moving_tagger = selectPartsForMove(); - if (moving_tagger->parts_to_move.empty()) - return false; + std::lock_guard lock(export_manifests_mutex); - assignee.scheduleMoveTask(std::make_shared( - [this, moving_tagger] () mutable + for (auto & manifest : export_manifests) + { + if (manifest.in_progress) { - ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings(); - WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); - return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; - }, moves_assignee_trigger, getStorageID())); - return true; + continue; + } + + manifest.in_progress = assignee.scheduleMoveTask(std::make_shared( + [this, manifest] () mutable { + exportPartToTableImpl(manifest, getContext()); + return true; + }, + moves_assignee_trigger, + getStorageID())); + + return manifest.in_progress; + } + + return false; } bool MergeTreeData::areBackgroundMovesNeeded() const @@ -9189,6 +9406,10 @@ bool MergeTreeData::canUsePolymorphicParts() const return canUsePolymorphicParts(*getSettings(), unused); } +void MergeTreeData::startBackgroundMoves() +{ + background_moves_assignee.start(); +} void MergeTreeData::checkDropOrRenameCommandDoesntAffectInProgressMutations( const AlterCommand & command, const std::map & unfinished_mutations, ContextPtr local_context) const diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 2cd69c086473..6a8a7fc55fc6 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -37,6 +37,8 @@ #include #include #include +#include +#include #include #include @@ -979,6 +981,12 @@ class MergeTreeData : public IStorage, public WithMutableContext /// Moves partition to specified Table void movePartitionToTable(const PartitionCommand & command, ContextPtr query_context); + void exportPartToTable(const PartitionCommand & command, ContextPtr query_context); + + void exportPartToTableImpl( + const MergeTreeExportManifest & manifest, + ContextPtr local_context); + /// Checks that Partition could be dropped right now /// Otherwise - throws an exception with detailed information. /// We do not use mutex because it is not very important that the size could change during the operation. @@ -1056,6 +1064,7 @@ class MergeTreeData : public IStorage, public WithMutableContext const WriteSettings & write_settings); virtual std::vector getMutationsStatus() const = 0; + std::vector getExportsStatus() const; /// Returns true if table can create new parts with adaptive granularity /// Has additional constraint in replicated version @@ -1241,6 +1250,10 @@ class MergeTreeData : public IStorage, public WithMutableContext /// Mutex for currently_moving_parts mutable std::mutex moving_parts_mutex; + mutable std::mutex export_manifests_mutex; + + std::set export_manifests; + PinnedPartUUIDsPtr getPinnedPartUUIDs() const; /// Schedules background job to like merge/mutate/fetch an executor @@ -1359,6 +1372,8 @@ class MergeTreeData : public IStorage, public WithMutableContext are_columns_and_secondary_indices_sizes_calculated = false; } + void startBackgroundMoves(); + /// Engine-specific methods BrokenPartCallback broken_part_callback; @@ -1825,8 +1840,6 @@ class MergeTreeData : public IStorage, public WithMutableContext bool canUsePolymorphicParts(const MergeTreeSettings & settings, String & out_reason) const; - virtual void startBackgroundMovesIfNeeded() = 0; - bool allow_nullable_key = false; bool allow_reverse_key = false; diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h new file mode 100644 index 000000000000..89aed701f993 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -0,0 +1,35 @@ +#include +#include + +namespace DB +{ + +struct MergeTreeExportManifest +{ + using DataPartPtr = std::shared_ptr; + + StorageID destination_storage_id; + DataPartPtr data_part; + time_t create_time = time(nullptr); + mutable bool in_progress = false; + + bool operator<(const MergeTreeExportManifest & rhs) const + { + // Lexicographic comparison: first compare destination storage, then part name + auto lhs_storage = destination_storage_id.getQualifiedName(); + auto rhs_storage = rhs.destination_storage_id.getQualifiedName(); + + if (lhs_storage != rhs_storage) + return lhs_storage < rhs_storage; + + return data_part->name < rhs.data_part->name; + } + + bool operator==(const MergeTreeExportManifest & rhs) const + { + return destination_storage_id.getQualifiedName() == rhs.destination_storage_id.getQualifiedName() + && data_part->name == rhs.data_part->name; + } +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeExportStatus.h b/src/Storages/MergeTree/MergeTreeExportStatus.h new file mode 100644 index 000000000000..e71a2f15e6ed --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeExportStatus.h @@ -0,0 +1,20 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +struct MergeTreeExportStatus +{ + String source_database; + String source_table; + String destination_database; + String destination_table; + time_t create_time = 0; + std::string part_name; +}; + +} diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index a4ab9066bb33..3037f67b23ac 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -466,6 +466,22 @@ void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Bl } } +Block MergeTreePartition::getBlockWithPartitionValues(const NamesAndTypesList & partition_columns) const +{ + chassert(partition_columns.size() == value.size()); + + Block result; + + std::size_t i = 0; + for (const auto & partition_column : partition_columns) + { + auto column = partition_column.type->createColumnConst(1, value[i++]); + result.insert({column, partition_column.type, partition_column.name}); + } + + return result; +} + NamesAndTypesList MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context) { auto adjusted_partition_key = adjustPartitionKey(metadata_snapshot, context); diff --git a/src/Storages/MergeTree/MergeTreePartition.h b/src/Storages/MergeTree/MergeTreePartition.h index 4338b216cdb8..811cfdc2a90c 100644 --- a/src/Storages/MergeTree/MergeTreePartition.h +++ b/src/Storages/MergeTree/MergeTreePartition.h @@ -60,6 +60,8 @@ struct MergeTreePartition void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context); + Block getBlockWithPartitionValues(const NamesAndTypesList & partition_columns) const; + /// Adjust partition key and execute its expression on block. Return sample block according to used expression. static NamesAndTypesList executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context); diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index d0385d1c7d33..cc43895f0c76 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -168,6 +168,9 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( addThrottler(read_settings.remote_throttler, context->getMergesThrottler()); addThrottler(read_settings.local_throttler, context->getMergesThrottler()); break; + case Export: + read_settings.local_throttler = context->getExportsThrottler(); + break; } MergeTreeReaderSettings reader_settings = diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.h b/src/Storages/MergeTree/MergeTreeSequentialSource.h index abba230d9e79..a858adf33bb5 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.h +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.h @@ -15,6 +15,7 @@ enum MergeTreeSequentialSourceType { Mutation, Merge, + Export, }; /// Create stream for reading single part from MergeTree. diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp new file mode 100644 index 000000000000..82a1bbfc81ea --- /dev/null +++ b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp @@ -0,0 +1,65 @@ + +#include + +namespace DB +{ + +StorageObjectStorageImporterSink::StorageObjectStorageImporterSink( + const std::string & path_, + const ObjectStoragePtr & object_storage_, + const ConfigurationPtr & configuration_, + const std::optional & format_settings_, + const Block & sample_block_, + const std::function & part_log_, + const ContextPtr & context_) + : SinkToStorage(sample_block_) + , object_storage(object_storage_) + , configuration(configuration_) + , format_settings(format_settings_) + , sample_block(sample_block_) + , context(context_) + , part_log(part_log_) +{ + stats.file_path = path_; + sink = std::make_shared( + stats.file_path, + object_storage, + configuration, + format_settings, + sample_block, + context); +} + +String StorageObjectStorageImporterSink::getName() const +{ + return "StorageObjectStorageMergeTreePartImporterSink"; +} + +void StorageObjectStorageImporterSink::consume(Chunk & chunk) +{ + sink->consume(chunk); + stats.read_bytes += chunk.bytes(); + stats.read_rows += chunk.getNumRows(); +} + +void StorageObjectStorageImporterSink::onFinish() +{ + sink->onFinish(); + + if (const auto object_metadata = object_storage->tryGetObjectMetadata(stats.file_path)) + { + stats.bytes_on_disk = object_metadata->size_bytes; + } + + part_log(stats); +} + +void StorageObjectStorageImporterSink::onException(std::exception_ptr exception) +{ + sink->onException(exception); + + stats.status = ExecutionStatus(-1, "Error importing part"); + part_log(stats); +} + +} diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h new file mode 100644 index 000000000000..051f5196f964 --- /dev/null +++ b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +using DataPartPtr = std::shared_ptr; + +/* + * Wrapper around `StorageObjectsStorageSink` that takes care of accounting & metrics for partition export + */ +class StorageObjectStorageImporterSink : public SinkToStorage +{ +public: + using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; + + StorageObjectStorageImporterSink( + const std::string & path_, + const ObjectStoragePtr & object_storage_, + const ConfigurationPtr & configuration_, + const std::optional & format_settings_, + const Block & sample_block_, + const std::function & part_log_, + const ContextPtr & context_); + + String getName() const override; + + void consume(Chunk & chunk) override; + + void onFinish() override; + + void onException(std::exception_ptr exception) override; + +private: + std::shared_ptr sink; + ObjectStoragePtr object_storage; + ConfigurationPtr configuration; + std::optional format_settings; + Block sample_block; + ContextPtr context; + std::function part_log; + + IStorage::ImportStats stats; +}; + +} diff --git a/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h b/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h new file mode 100644 index 000000000000..1ed503cbf3c6 --- /dev/null +++ b/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h @@ -0,0 +1,79 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + struct ObjectStorageFilePathGenerator + { + virtual ~ObjectStorageFilePathGenerator() = default; + std::string getPathForWrite(const std::string & partition_id) const { + return getPathForWrite(partition_id, ""); + } + virtual std::string getPathForWrite(const std::string & partition_id, const std::string & /* file_name_override */) const = 0; + virtual std::string getPathForRead() const = 0; + }; + + struct ObjectStorageWildcardFilePathGenerator : ObjectStorageFilePathGenerator + { + explicit ObjectStorageWildcardFilePathGenerator(const std::string & raw_path_) : raw_path(raw_path_) {} + + using ObjectStorageFilePathGenerator::getPathForWrite; // Bring base class overloads into scope + std::string getPathForWrite(const std::string & partition_id, const std::string & /* file_name_override */) const override + { + return PartitionedSink::replaceWildcards(raw_path, partition_id); + } + + std::string getPathForRead() const override + { + return raw_path; + } + + private: + std::string raw_path; + + }; + + struct ObjectStorageAppendFilePathGenerator : ObjectStorageFilePathGenerator + { + explicit ObjectStorageAppendFilePathGenerator( + const std::string & raw_path_, + const std::string & file_format_) + : raw_path(raw_path_), file_format(Poco::toLower(file_format_)){} + + using ObjectStorageFilePathGenerator::getPathForWrite; // Bring base class overloads into scope + std::string getPathForWrite(const std::string & partition_id, const std::string & file_name_override) const override + { + std::string result; + + result += raw_path; + + if (raw_path.back() != '/') + { + result += "/"; + } + + /// Not adding '/' because buildExpressionHive() always adds a trailing '/' + result += partition_id; + + const auto file_name = file_name_override.empty() ? std::to_string(generateSnowflakeID()) : file_name_override; + + result += file_name + "." + file_format; + + return result; + } + + std::string getPathForRead() const override + { + return raw_path + "**." + file_format; + } + + private: + std::string raw_path; + std::string file_format; + }; + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 0d333b9a9713..ee780987f98e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -33,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -444,7 +446,8 @@ SinkToStoragePtr StorageObjectStorage::write( if (configuration->partition_strategy) { - return std::make_shared(object_storage, configuration, format_settings, sample_block, local_context); + auto sink_creator = std::make_shared(object_storage, configuration, format_settings, sample_block, local_context); + return std::make_shared(configuration->partition_strategy, sink_creator, local_context, sample_block); } auto paths = configuration->getPaths(); @@ -476,6 +479,48 @@ bool StorageObjectStorage::optimize( return configuration->optimize(metadata_snapshot, context, format_settings); } +bool StorageObjectStorage::supportsImport() const +{ + return configuration->partition_strategy != nullptr && configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE; +} + +SinkToStoragePtr StorageObjectStorage::import( + const std::string & file_name, + Block & block_with_partition_values, + ContextPtr local_context, + std::function part_log) +{ + std::string partition_key; + + if (configuration->partition_strategy) + { + const auto column_with_partition_key = configuration->partition_strategy->computePartitionKey(block_with_partition_values); + + if (!column_with_partition_key->empty()) + { + partition_key = column_with_partition_key->getDataAt(0).toString(); + } + } + + const auto file_path = configuration->getPathForWrite(partition_key, file_name).path; + + if (object_storage->exists(StoredObject(file_path))) + { + LOG_INFO(getLogger("StorageObjectStorage"), "File {} already exists, skipping import", file_path); + return nullptr; + } + + return std::make_shared( + file_path, + object_storage, + configuration, + format_settings, + getInMemoryMetadataPtr()->getSampleBlock(), + part_log, + local_context + ); +} + void StorageObjectStorage::truncate( const ASTPtr & /* query */, const StorageMetadataPtr & /* metadata_snapshot */, @@ -647,5 +692,9 @@ void StorageObjectStorage::checkAlterIsPossible(const AlterCommands & commands, configuration->checkAlterIsPossible(commands); } +StorageObjectStorage::Configuration::Path StorageObjectStorage::Configuration::getPathForWrite(const std::string & partition_id, const std::string & filename_override) const +{ + return Path {file_path_generator->getPathForWrite(partition_id, filename_override)}; +} } diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index cead14537e6b..4bc7c3e283b6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -7,6 +7,7 @@ #include #include #include +#include "Storages/ObjectStorage/ObjectStorageFilePathGenerator.h" #include #include #include @@ -76,6 +77,15 @@ class StorageObjectStorage : public IStorage ContextPtr context, bool async_insert) override; + + bool supportsImport() const override; + + SinkToStoragePtr import( + const std::string & /* file_name */, + Block & /* block_with_partition_values */, + ContextPtr /* context */, + std::function /* part_log */) override; + void truncate( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index ea529021429e..415ee23d923f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -100,6 +100,11 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( metadata.setColumns(columns); metadata.setConstraints(constraints_); + if (configuration->getPartitionStrategy()) + { + metadata.partition_key = configuration->getPartitionStrategy()->getPartitionKeyDescription(); + } + setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns)); setInMemoryMetadata(metadata); } @@ -241,4 +246,24 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; } +bool StorageObjectStorageCluster::supportsImport() const +{ + if (pure_storage) + return pure_storage->supportsImport(); + return false; +} + +SinkToStoragePtr StorageObjectStorageCluster::import( + const std::string & file_name, + Block & block_with_partition_values, + ContextPtr context, + std::function part_log) +{ + if (pure_storage) + return pure_storage->import(file_name, block_with_partition_values, context, part_log); + + return IStorageCluster::import(file_name, block_with_partition_values, context, part_log); +} + + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 6dc9837da134..9a35791a6552 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -33,6 +33,14 @@ class StorageObjectStorageCluster : public IStorageCluster std::optional totalRows(ContextPtr query_context) const override; std::optional totalBytes(ContextPtr query_context) const override; + bool supportsImport() const override; + + SinkToStoragePtr import( + const std::string & /* file_name */, + Block & /* block_with_partition_values */, + ContextPtr /* context */, + std::function /* part_log */) override; + private: void updateQueryToSendIfNeeded( ASTPtr & query, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp index 3699a35f6861..4951aa317e3e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp @@ -89,6 +89,17 @@ void StorageObjectStorageConfiguration::initialize( } } + if (configuration_to_initialize.partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE) + { + configuration_to_initialize.file_path_generator = std::make_shared( + configuration_to_initialize.getRawPath().path, + configuration_to_initialize.format); + } + else + { + configuration_to_initialize.file_path_generator = std::make_shared(configuration_to_initialize.getRawPath().path); + } + if (configuration_to_initialize.format == "auto") { if (configuration_to_initialize.isDataLakeConfiguration()) @@ -106,8 +117,7 @@ void StorageObjectStorageConfiguration::initialize( else FormatFactory::instance().checkFormatName(configuration_to_initialize.format); - /// It might be changed on `StorageObjectStorageConfiguration::initPartitionStrategy` - configuration_to_initialize.read_path = configuration_to_initialize.getRawPath(); + configuration_to_initialize.read_path = file_path_generator->getPathForRead(); configuration_to_initialize.initialized = true; } @@ -137,14 +147,12 @@ const StorageObjectStorageConfiguration::Path & StorageObjectStorageConfiguratio StorageObjectStorageConfiguration::Path StorageObjectStorageConfiguration::getPathForWrite(const std::string & partition_id) const { - auto raw_path = getRawPath(); - - if (!partition_strategy) - { - return raw_path; - } + return getPathForWrite(partition_id, /* filename_override */ ""); +} - return Path {partition_strategy->getPathForWrite(raw_path.path, partition_id)}; +StorageObjectStorage::Configuration::Path StorageObjectStorage::Configuration::getPathForWrite(const std::string & partition_id, const std::string & filename_override) const +{ + return Path {file_path_generator->getPathForWrite(partition_id, filename_override)}; } bool StorageObjectStorageConfiguration::Path::hasPartitionWildcard() const diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h index e6527a53b74d..503b381928f1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h @@ -13,6 +13,7 @@ #include #include #include +#include namespace DB { @@ -95,6 +96,8 @@ class StorageObjectStorageConfiguration // Path used for writing, it should not be globbed and might contain a partition key Path getPathForWrite(const std::string & partition_id = "") const; + Path getPathForWrite(const std::string & partition_id, const std::string & filename_override) const; + void setPathForRead(const Path & path) { read_path = path; @@ -254,6 +257,8 @@ class StorageObjectStorageConfiguration // Path used for reading, by default it is the same as `getRawPath` // When using `partition_strategy=hive`, a recursive reading pattern will be appended `'table_root/**.parquet' Path read_path; + + std::shared_ptr file_path_generator; }; using StorageObjectStorageConfigurationPtr = std::shared_ptr; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp index a3d51ca76111..fca9b7843599 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp @@ -142,8 +142,7 @@ PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink( std::optional format_settings_, SharedHeader sample_block_, ContextPtr context_) - : PartitionedSink(configuration_->partition_strategy, context_, sample_block_) - , object_storage(object_storage_) + : object_storage(object_storage_) , configuration(configuration_) , query_settings(configuration_->getQuerySettings(context_)) , format_settings(format_settings_) @@ -176,7 +175,7 @@ SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String object_storage, configuration, format_settings, - std::make_shared(partition_strategy->getFormatHeader()), + std::make_shared(configuration->partition_strategy->getFormatHeader()), context ); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.h b/src/Storages/ObjectStorage/StorageObjectStorageSink.h index f4a775030715..39873998ad7a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.h @@ -8,6 +8,8 @@ namespace DB { class StorageObjectStorageSink : public SinkToStorage { +friend class StorageObjectStorageImporterSink; + public: StorageObjectStorageSink( const std::string & path_, @@ -41,7 +43,7 @@ class StorageObjectStorageSink : public SinkToStorage void cancelBuffers(); }; -class PartitionedStorageObjectStorageSink : public PartitionedSink +class PartitionedStorageObjectStorageSink : public PartitionedSink::SinkCreator { public: PartitionedStorageObjectStorageSink( diff --git a/src/Storages/PartitionCommands.cpp b/src/Storages/PartitionCommands.cpp index c12da89d7ed4..96f49a60e511 100644 --- a/src/Storages/PartitionCommands.cpp +++ b/src/Storages/PartitionCommands.cpp @@ -130,6 +130,16 @@ std::optional PartitionCommand::parse(const ASTAlterCommand * res.with_name = command_ast->with_name; return res; } + if (command_ast->type == ASTAlterCommand::EXPORT_PART) + { + PartitionCommand res; + res.type = EXPORT_PART; + res.partition = command_ast->partition->clone(); + res.part = command_ast->part; + res.to_database = command_ast->to_database; + res.to_table = command_ast->to_table; + return res; + } return {}; } @@ -171,6 +181,8 @@ std::string PartitionCommand::typeToString() const return "UNFREEZE ALL"; case PartitionCommand::Type::REPLACE_PARTITION: return "REPLACE PARTITION"; + case PartitionCommand::Type::EXPORT_PART: + return "EXPORT PART"; default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Uninitialized partition command"); } diff --git a/src/Storages/PartitionCommands.h b/src/Storages/PartitionCommands.h index 917e510f24b4..15d2a7fb869f 100644 --- a/src/Storages/PartitionCommands.h +++ b/src/Storages/PartitionCommands.h @@ -33,6 +33,7 @@ struct PartitionCommand UNFREEZE_ALL_PARTITIONS, UNFREEZE_PARTITION, REPLACE_PARTITION, + EXPORT_PART, }; Type type = UNKNOWN; diff --git a/src/Storages/PartitionedSink.cpp b/src/Storages/PartitionedSink.cpp index 078237483154..2a3df191dd92 100644 --- a/src/Storages/PartitionedSink.cpp +++ b/src/Storages/PartitionedSink.cpp @@ -26,10 +26,12 @@ namespace ErrorCodes PartitionedSink::PartitionedSink( std::shared_ptr partition_strategy_, + std::shared_ptr sink_creator_, ContextPtr context_, SharedHeader source_header_) : SinkToStorage(source_header_) , partition_strategy(partition_strategy_) + , sink_creator(sink_creator_) , context(context_) , source_header(source_header_) { @@ -41,7 +43,7 @@ SinkPtr PartitionedSink::getSinkForPartitionKey(StringRef partition_key) auto it = partition_id_to_sink.find(partition_key); if (it == partition_id_to_sink.end()) { - auto sink = createSinkForPartition(partition_key.toString()); + auto sink = sink_creator->createSinkForPartition(partition_key.toString()); std::tie(it, std::ignore) = partition_id_to_sink.emplace(partition_key, sink); } diff --git a/src/Storages/PartitionedSink.h b/src/Storages/PartitionedSink.h index 444624ba6c8e..bc446477e9dd 100644 --- a/src/Storages/PartitionedSink.h +++ b/src/Storages/PartitionedSink.h @@ -17,10 +17,17 @@ namespace DB class PartitionedSink : public SinkToStorage { public: + struct SinkCreator + { + virtual ~SinkCreator() = default; + virtual SinkPtr createSinkForPartition(const String & partition_id) = 0; + }; + static constexpr auto PARTITION_ID_WILDCARD = "{_partition_id}"; PartitionedSink( std::shared_ptr partition_strategy_, + std::shared_ptr sink_creator_, ContextPtr context_, SharedHeader source_header_); @@ -34,16 +41,15 @@ class PartitionedSink : public SinkToStorage void onFinish() override; - virtual SinkPtr createSinkForPartition(const String & partition_id) = 0; - static void validatePartitionKey(const String & str, bool allow_slash); static String replaceWildcards(const String & haystack, const String & partition_id); + protected: std::shared_ptr partition_strategy; - private: + std::shared_ptr sink_creator; ContextPtr context; SharedHeader source_header; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index d57ca4f996a6..a8d29ff96365 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1989,7 +1989,7 @@ class StorageFileSink final : public SinkToStorage, WithContext std::unique_lock lock; }; -class PartitionedStorageFileSink : public PartitionedSink +class PartitionedStorageFileSink : public PartitionedSink::SinkCreator { public: PartitionedStorageFileSink( @@ -2004,7 +2004,7 @@ class PartitionedStorageFileSink : public PartitionedSink const String format_name_, ContextPtr context_, int flags_) - : PartitionedSink(partition_strategy_, context_, std::make_shared(metadata_snapshot_->getSampleBlock())) + : partition_strategy(partition_strategy_) , path(path_) , metadata_snapshot(metadata_snapshot_) , table_name_for_log(table_name_for_log_) @@ -2020,11 +2020,12 @@ class PartitionedStorageFileSink : public PartitionedSink SinkPtr createSinkForPartition(const String & partition_id) override { - std::string filepath = partition_strategy->getPathForWrite(path, partition_id); + const auto file_path_generator = std::make_shared(path); + std::string filepath = file_path_generator->getPathForWrite(partition_id); fs::create_directories(fs::path(filepath).parent_path()); - validatePartitionKey(filepath, true); + PartitionedSink::validatePartitionKey(filepath, true); checkCreationIsAllowed(context, context->getUserFilesPath(), filepath, /*can_be_directory=*/ true); return std::make_shared( metadata_snapshot, @@ -2041,6 +2042,7 @@ class PartitionedStorageFileSink : public PartitionedSink } private: + std::shared_ptr partition_strategy; const String path; StorageMetadataPtr metadata_snapshot; String table_name_for_log; @@ -2092,7 +2094,7 @@ SinkToStoragePtr StorageFile::write( has_wildcards, /* partition_columns_in_data_file */true); - return std::make_shared( + auto sink_creator = std::make_shared( partition_strategy, metadata_snapshot, getStorageID().getNameForLogs(), @@ -2104,6 +2106,13 @@ SinkToStoragePtr StorageFile::write( format_name, context, flags); + + return std::make_shared( + partition_strategy, + sink_creator, + context, + metadata_snapshot->getSampleBlock() + ); } String path; @@ -2129,6 +2138,7 @@ SinkToStoragePtr StorageFile::write( String new_path; do { + new_path = path.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : path.substr(pos)); ++index; } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index aae11a957aaa..d200656586da 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -120,6 +120,7 @@ namespace ErrorCodes extern const int TABLE_IS_READ_ONLY; extern const int TOO_MANY_PARTS; extern const int PART_IS_LOCKED; + extern const int INCOMPATIBLE_COLUMNS; } namespace ActionLocks @@ -209,7 +210,7 @@ void StorageMergeTree::startup() try { background_operations_assignee.start(); - startBackgroundMovesIfNeeded(); + startBackgroundMoves(); startOutdatedAndUnexpectedDataPartsLoadingTask(); } catch (...) @@ -2813,12 +2814,6 @@ MutationCounters StorageMergeTree::getMutationCounters() const return mutation_counters; } -void StorageMergeTree::startBackgroundMovesIfNeeded() -{ - if (areBackgroundMovesNeeded()) - background_moves_assignee.start(); -} - std::unique_ptr StorageMergeTree::getDefaultSettings() const { return std::make_unique(getContext()->getMergeTreeSettings()); diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 00cbc7acdad8..0bffa6ead7d3 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -286,8 +286,6 @@ class StorageMergeTree final : public MergeTreeData std::unique_ptr fillNewPartName(MutableDataPartPtr & part, DataPartsLock & lock); std::unique_ptr fillNewPartNameAndResetLevel(MutableDataPartPtr & part, DataPartsLock & lock); - void startBackgroundMovesIfNeeded() override; - BackupEntries backupMutations(UInt64 version, const String & data_path_in_backup) const; /// Attaches restored parts to the storage. diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index b0951c52f99c..db5c07bd8cd5 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5634,7 +5634,7 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread, const ZooK restarting_thread.start(true); }); - startBackgroundMovesIfNeeded(); + startBackgroundMoves(); part_moves_between_shards_orchestrator.start(); @@ -9842,13 +9842,6 @@ MutationCounters StorageReplicatedMergeTree::getMutationCounters() const return queue.getMutationCounters(); } -void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded() -{ - if (areBackgroundMovesNeeded()) - background_moves_assignee.start(); -} - - std::unique_ptr StorageReplicatedMergeTree::getDefaultSettings() const { return std::make_unique(getContext()->getReplicatedMergeTreeSettings()); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 487c3a3f44c0..5abf14c1400d 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -955,8 +955,6 @@ class StorageReplicatedMergeTree final : public MergeTreeData MutationsSnapshotPtr getMutationsSnapshot(const IMutationsSnapshot::Params & params) const override; - void startBackgroundMovesIfNeeded() override; - /// Attaches restored parts to the storage. void attachRestoredParts(MutableDataPartsVector && parts) override; diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index a8236dacc02d..4ba18d3aae7a 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -727,7 +727,7 @@ void StorageURLSink::cancelBuffers() write_buf->cancel(); } -class PartitionedStorageURLSink : public PartitionedSink +class PartitionedStorageURLSink : public PartitionedSink::SinkCreator { public: PartitionedStorageURLSink( @@ -741,7 +741,7 @@ class PartitionedStorageURLSink : public PartitionedSink const CompressionMethod compression_method_, const HTTPHeaderEntries & headers_, const String & http_method_) - : PartitionedSink(partition_strategy_, context_, std::make_shared(sample_block_)) + : partition_strategy(partition_strategy_) , uri(uri_) , format(format_) , format_settings(format_settings_) @@ -756,7 +756,8 @@ class PartitionedStorageURLSink : public PartitionedSink SinkPtr createSinkForPartition(const String & partition_id) override { - std::string partition_path = partition_strategy->getPathForWrite(uri, partition_id); + const auto file_path_generator = std::make_shared(uri); + std::string partition_path = file_path_generator->getPathForWrite(partition_id); context->getRemoteHostFilter().checkURL(Poco::URI(partition_path)); return std::make_shared( @@ -764,6 +765,7 @@ class PartitionedStorageURLSink : public PartitionedSink } private: + std::shared_ptr partition_strategy; const String uri; const String format; const std::optional format_settings; @@ -1445,7 +1447,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad has_wildcards, /* partition_columns_in_data_file */true); - return std::make_shared( + auto sink_creator = std::make_shared( partition_strategy, uri, format_name, @@ -1456,6 +1458,8 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad compression_method, headers, http_method); + + return std::make_shared(partition_strategy, sink_creator, context, metadata_snapshot->getSampleBlock()); } return std::make_shared( diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp new file mode 100644 index 000000000000..8a25b4188426 --- /dev/null +++ b/src/Storages/System/StorageSystemExports.cpp @@ -0,0 +1,118 @@ +#include +#include +#include +#include +#include "Columns/ColumnString.h" +#include "DataTypes/DataTypeString.h" +#include +#include "Storages/VirtualColumnUtils.h" +#include +#include +#include + + +namespace DB +{ + +ColumnsDescription StorageSystemExports::getColumnsDescription() +{ + return ColumnsDescription + { + {"source_database", std::make_shared(), "Name of the source database."}, + {"source_table", std::make_shared(), "Name of the source table."}, + {"destination_database", std::make_shared(), "Name of the destination database."}, + {"destination_table", std::make_shared(), "Name of the destination table."}, + {"create_time", std::make_shared(), "Date and time when the export command was submitted for execution."}, + {"part_name", std::make_shared(), "Name of the part"} + }; +} + +void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector) const +{ + const auto access = context->getAccess(); + const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES); + + /// Collect a set of *MergeTree tables. + std::map> merge_tree_tables; + for (const auto & db : DatabaseCatalog::instance().getDatabases()) + { + /// Check if database can contain MergeTree tables + if (!db.second->canContainMergeTreeTables()) + continue; + + const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); + + for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) + { + const auto & table = iterator->table(); + if (!table) + continue; + + if (!dynamic_cast(table.get())) + continue; + + if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) + continue; + + merge_tree_tables[db.first][iterator->name()] = table; + } + } + + MutableColumnPtr col_source_database_export = ColumnString::create(); + MutableColumnPtr col_source_table_export = ColumnString::create(); + + for (auto & db : merge_tree_tables) + { + for (auto & table : db.second) + { + col_source_database_export->insert(db.first); + col_source_table_export->insert(table.first); + } + } + + ColumnPtr col_source_database = std::move(col_source_database_export); + ColumnPtr col_source_table = std::move(col_source_table_export); + + /// Determine what tables are needed by the conditions in the query. + { + Block filtered_block + { + { col_source_database, std::make_shared(), "source_database" }, + { col_source_table, std::make_shared(), "source_table" }, + }; + + VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context); + + if (!filtered_block.rows()) + return; + + col_source_database = filtered_block.getByName("source_database").column; + col_source_table = filtered_block.getByName("source_table").column; + } + + for (size_t i_storage = 0; i_storage < col_source_database->size(); ++i_storage) + { + auto database = (*col_source_database)[i_storage].safeGet(); + auto table = (*col_source_table)[i_storage].safeGet(); + + std::vector statuses; + { + const IStorage * storage = merge_tree_tables[database][table].get(); + if (const auto * merge_tree = dynamic_cast(storage)) + statuses = merge_tree->getExportsStatus(); + } + + for (const MergeTreeExportStatus & status : statuses) + { + size_t col_num = 0; + res_columns[col_num++]->insert(status.source_database); + res_columns[col_num++]->insert(status.source_table); + res_columns[col_num++]->insert(status.destination_database); + res_columns[col_num++]->insert(status.destination_table); + res_columns[col_num++]->insert(status.create_time); + res_columns[col_num++]->insert(status.part_name); + } + } +} + +} diff --git a/src/Storages/System/StorageSystemExports.h b/src/Storages/System/StorageSystemExports.h new file mode 100644 index 000000000000..e13fbfa26aaa --- /dev/null +++ b/src/Storages/System/StorageSystemExports.h @@ -0,0 +1,25 @@ +#pragma once + +#include + + +namespace DB +{ + +class Context; + + +class StorageSystemExports final : public IStorageSystemOneBlock +{ +public: + std::string getName() const override { return "SystemExports"; } + + static ColumnsDescription getColumnsDescription(); + +protected: + using IStorageSystemOneBlock::IStorageSystemOneBlock; + + void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector) const override; +}; + +} diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index 9e996d5ca2b9..e8abd22b97d1 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -103,6 +103,7 @@ #include #include #include +#include #include #include diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index 716d0cd00634..73f40638bfb5 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -43,6 +43,7 @@ ALTER TTL ['ALTER MODIFY TTL','MODIFY TTL'] TABLE ALTER TABLE ALTER MATERIALIZE TTL ['MATERIALIZE TTL'] TABLE ALTER TABLE ALTER SETTINGS ['ALTER SETTING','ALTER MODIFY SETTING','MODIFY SETTING','RESET SETTING'] TABLE ALTER TABLE ALTER MOVE PARTITION ['ALTER MOVE PART','MOVE PARTITION','MOVE PART'] TABLE ALTER TABLE +ALTER EXPORT PART ['ALTER EXPORT PART','EXPORT PART'] TABLE ALTER TABLE ALTER FETCH PARTITION ['ALTER FETCH PART','FETCH PARTITION'] TABLE ALTER TABLE ALTER FREEZE PARTITION ['FREEZE PARTITION','UNFREEZE'] TABLE ALTER TABLE ALTER UNLOCK SNAPSHOT ['UNLOCK SNAPSHOT'] TABLE ALTER TABLE diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference new file mode 100644 index 000000000000..75c32111f7a3 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference @@ -0,0 +1,16 @@ +---- Export 2020_1_1_0 and 2021_2_2_0 +---- Both data parts should appear +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 1 +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 2 +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 3 +test/s3_table_NAME/year=2021/2021_2_2_0.parquet 4 +---- Export the same part again, it should be idempotent +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 1 +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 2 +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 3 +test/s3_table_NAME/year=2021/2021_2_2_0.parquet 4 +---- Data in roundtrip MergeTree table (should match s3_table) +1 2020 +2 2020 +3 2020 +4 2021 diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh new file mode 100755 index 000000000000..5e886eaf4755 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +mt_table="mt_table_${RANDOM}" +s3_table="s3_table_${RANDOM}" +mt_table_roundtrip="mt_table_roundtrip_${RANDOM}" + +query() { + $CLICKHOUSE_CLIENT --query "$1" +} + +query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip" + +query "CREATE TABLE $mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()" +query "CREATE TABLE $s3_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table', format=Parquet, partition_strategy='hive') PARTITION BY year" + +query "INSERT INTO $mt_table VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)" +echo "---- Export 2020_1_1_0 and 2021_2_2_0" +query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" +query "ALTER TABLE $mt_table EXPORT PART '2021_2_2_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +echo "---- Both data parts should appear" +query "SELECT DISTINCT ON (id) replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), id FROM $s3_table ORDER BY id" + +echo "---- Export the same part again, it should be idempotent" +query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +query "SELECT DISTINCT ON (id) replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), id FROM $s3_table ORDER BY id" + +query "CREATE TABLE $mt_table_roundtrip ENGINE = MergeTree() PARTITION BY year ORDER BY tuple() AS SELECT * FROM $s3_table" + +echo "---- Data in roundtrip MergeTree table (should match s3_table)" +query "SELECT DISTINCT ON (id) * FROM $mt_table_roundtrip ORDER BY id" + +query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip" diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql new file mode 100644 index 000000000000..136b12142383 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql @@ -0,0 +1,22 @@ +-- Tags: no-parallel + +DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table; + +CREATE TABLE 03572_mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); + +INSERT INTO 03572_mt_table VALUES (1, 2020); + +-- Create a table with a different partition key and export a partition to it. It should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, x UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table', format='Parquet', partition_strategy='hive') PARTITION BY x; + +ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_invalid_schema_table +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError INCOMPATIBLE_COLUMNS} + +DROP TABLE 03572_invalid_schema_table; + +-- The only partition strategy that supports exports is hive. Wildcard should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table/{_partition_id}', format='Parquet', partition_strategy='wildcard') PARTITION BY (id, year); + +ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_invalid_schema_table SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError NOT_IMPLEMENTED} + +DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table; diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.reference new file mode 100644 index 000000000000..07f1ec6376a6 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.reference @@ -0,0 +1,16 @@ +---- Get actual part names and export them +---- Both data parts should appear +1 2020 +2 2020 +3 2020 +4 2021 +---- Export the same part again, it should be idempotent +1 2020 +2 2020 +3 2020 +4 2021 +---- Data in roundtrip ReplicatedMergeTree table (should match s3_table) +1 2020 +2 2020 +3 2020 +4 2021 diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh new file mode 100755 index 000000000000..3b955d4bbbe5 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash +# Tags: replica, no-parallel, no-replicated-database + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +rmt_table="rmt_table_${RANDOM}" +s3_table="s3_table_${RANDOM}" +rmt_table_roundtrip="rmt_table_roundtrip_${RANDOM}" + +query() { + $CLICKHOUSE_CLIENT --query "$1" +} + +query "DROP TABLE IF EXISTS $rmt_table, $s3_table, $rmt_table_roundtrip" + +query "CREATE TABLE $rmt_table (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/$rmt_table', 'replica1') PARTITION BY year ORDER BY tuple()" +query "CREATE TABLE $s3_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table', format=Parquet, partition_strategy='hive') PARTITION BY year" + +query "INSERT INTO $rmt_table VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)" + +echo "---- Get actual part names and export them" +part_2020=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$rmt_table' AND partition = '2020' ORDER BY name LIMIT 1" | tr -d '\n') +part_2021=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$rmt_table' AND partition = '2021' ORDER BY name LIMIT 1" | tr -d '\n') + +query "ALTER TABLE $rmt_table EXPORT PART '$part_2020' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" +query "ALTER TABLE $rmt_table EXPORT PART '$part_2021' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +echo "---- Both data parts should appear" +query "SELECT * FROM $s3_table ORDER BY id" + +echo "---- Export the same part again, it should be idempotent" +query "ALTER TABLE $rmt_table EXPORT PART '$part_2020' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +query "SELECT * FROM $s3_table ORDER BY id" + +query "CREATE TABLE $rmt_table_roundtrip ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/$rmt_table_roundtrip', 'replica1') PARTITION BY year ORDER BY tuple() AS SELECT * FROM $s3_table" + +echo "---- Data in roundtrip ReplicatedMergeTree table (should match s3_table)" +query "SELECT * FROM $rmt_table_roundtrip ORDER BY id" + +query "DROP TABLE IF EXISTS $rmt_table, $s3_table, $rmt_table_roundtrip" diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.reference b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql new file mode 100644 index 000000000000..e0aa82532190 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql @@ -0,0 +1,22 @@ +-- Tags: no-parallel + +DROP TABLE IF EXISTS 03572_rmt_table, 03572_invalid_schema_table; + +CREATE TABLE 03572_rmt_table (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_03572_rmt/03572_rmt_table', 'replica1') PARTITION BY year ORDER BY tuple(); + +INSERT INTO 03572_rmt_table VALUES (1, 2020); + +-- Create a table with a different partition key and export a partition to it. It should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, x UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table', format='Parquet', partition_strategy='hive') PARTITION BY x; + +ALTER TABLE 03572_rmt_table EXPORT PART '2020_0_0_0' TO TABLE 03572_invalid_schema_table +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError INCOMPATIBLE_COLUMNS} + +DROP TABLE 03572_invalid_schema_table; + +-- The only partition strategy that supports exports is hive. Wildcard should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table/{_partition_id}', format='Parquet', partition_strategy='wildcard') PARTITION BY (id, year); + +ALTER TABLE 03572_rmt_table EXPORT PART '2020_0_0_0' TO TABLE 03572_invalid_schema_table SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError NOT_IMPLEMENTED} + +DROP TABLE IF EXISTS 03572_rmt_table, 03572_invalid_schema_table; From 5cbcae43cce8d0281ec1b68a01eb504b688d1298 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 29 Sep 2025 10:34:41 -0300 Subject: [PATCH 02/14] opsy --- src/Common/ProfileEvents.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index e6dee19233e0..3e733145dce0 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -171,6 +171,8 @@ M(MergesThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_merges_bandwidth_for_server' throttling.", ValueType::Microseconds) \ M(MutationsThrottlerBytes, "Bytes passed through 'max_mutations_bandwidth_for_server' throttler.", ValueType::Bytes) \ M(MutationsThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_mutations_bandwidth_for_server' throttling.", ValueType::Microseconds) \ + M(ExportsThrottlerBytes, "Bytes passed through 'max_exports_bandwidth_for_server' throttler.", ValueType::Bytes) \ + M(ExportsThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_exports_bandwidth_for_server' throttling.", ValueType::Microseconds) \ M(QueryRemoteReadThrottlerBytes, "Bytes passed through 'max_remote_read_network_bandwidth' throttler.", ValueType::Bytes) \ M(QueryRemoteReadThrottlerSleepMicroseconds, "Total time a query was sleeping to conform 'max_remote_read_network_bandwidth' throttling.", ValueType::Microseconds) \ M(QueryRemoteWriteThrottlerBytes, "Bytes passed through 'max_remote_write_network_bandwidth' throttler.", ValueType::Bytes) \ From 55cc3f947437cb403de7c56cada8fdc161e8226f Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Mon, 29 Sep 2025 15:32:14 +0200 Subject: [PATCH 03/14] Merge pull request #1017 from Altinity/simple_export_part improve observability a bit, simplify sink --- src/Common/CurrentMetrics.cpp | 1 + src/Common/ProfileEvents.cpp | 4 + src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 1 + src/Interpreters/Context.cpp | 4 + src/Interpreters/Context.h | 4 + src/Storages/IStorage.h | 26 ++-- src/Storages/MergeTree/ExportList.cpp | 66 ++++++++ src/Storages/MergeTree/ExportList.h | 90 +++++++++++ src/Storages/MergeTree/MergeTreeData.cpp | 143 ++++++++++++++---- src/Storages/MergeTree/MergeTreeData.h | 4 +- .../MergeTree/MergeTreeExportManifest.h | 17 ++- .../StorageObjectStorageImporterSink.cpp | 65 -------- .../StorageObjectStorageImporterSink.h | 54 ------- .../ObjectStorage/StorageObjectStorage.cpp | 25 ++- .../ObjectStorage/StorageObjectStorage.h | 5 +- .../StorageObjectStorageCluster.cpp | 9 +- .../StorageObjectStorageCluster.h | 5 +- src/Storages/System/StorageSystemExports.cpp | 122 +++++---------- src/Storages/System/StorageSystemMerges.cpp | 2 +- src/Storages/System/attachSystemTables.cpp | 2 + ...erge_tree_part_to_object_storage.reference | 16 +- ...xport_merge_tree_part_to_object_storage.sh | 6 +- 23 files changed, 387 insertions(+), 287 deletions(-) create mode 100644 src/Storages/MergeTree/ExportList.cpp create mode 100644 src/Storages/MergeTree/ExportList.h delete mode 100644 src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp delete mode 100644 src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 41b16996ba74..f1f2604918ea 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -10,6 +10,7 @@ M(Merge, "Number of executing background merges") \ M(MergeParts, "Number of source parts participating in current background merges") \ M(Move, "Number of currently executing moves") \ + M(Export, "Number of currently executing exports") \ M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \ M(ReplicatedFetch, "Number of data parts being fetched from replica") \ M(ReplicatedSend, "Number of data parts being sent to replicas") \ diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 3e733145dce0..22e5abb3df56 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -28,6 +28,10 @@ M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.", ValueType::Bytes) \ M(AsyncInsertRows, "Number of rows inserted by asynchronous INSERT queries.", ValueType::Number) \ M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.", ValueType::Number) \ + M(PartsExports, "Number of successful part exports.", ValueType::Number) \ + M(PartsExportFailures, "Number of failed part exports.", ValueType::Number) \ + M(PartsExportDuplicated, "Number of part exports that failed because target already exists.", ValueType::Number) \ + M(PartsExportTotalMilliseconds, "Total time spent on part export operations.", ValueType::Milliseconds) \ M(FailedQuery, "Number of failed queries.", ValueType::Number) \ M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.", ValueType::Number) \ M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.", ValueType::Number) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index f3d0228bbf3f..9844624014a8 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6854,6 +6854,9 @@ Possible values: )", 0) \ DECLARE(Bool, use_roaring_bitmap_iceberg_positional_deletes, false, R"( Use roaring bitmap for iceberg positional deletes. +)", 0) \ + DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"( +Overwrite file if it already exists when exporting a merge tree part )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 0fd9c302500d..34a630ab7299 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -141,6 +141,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_max_nodes", 0, 0, "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, + {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 9934eb747f8e..3e761809efb1 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -504,6 +505,7 @@ struct ContextSharedPart : boost::noncopyable GlobalOvercommitTracker global_overcommit_tracker; MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree) MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree) + ExportsList exports_list; /// The list of executing exports (for (Replicated)?MergeTree) ReplicatedFetchList replicated_fetch_list; RefreshSet refresh_set; /// The list of active refreshes (for MaterializedView) ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections. @@ -1214,6 +1216,8 @@ MergeList & Context::getMergeList() { return shared->merge_list; } const MergeList & Context::getMergeList() const { return shared->merge_list; } MovesList & Context::getMovesList() { return shared->moves_list; } const MovesList & Context::getMovesList() const { return shared->moves_list; } +ExportsList & Context::getExportsList() { return shared->exports_list; } +const ExportsList & Context::getExportsList() const { return shared->exports_list; } ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; } const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; } RefreshSet & Context::getRefreshSet() { return shared->refresh_set; } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index e06da1fa4dd3..ea36574e9356 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -89,6 +89,7 @@ class AsynchronousMetrics; class BackgroundSchedulePool; class MergeList; class MovesList; +class ExportsList; class ReplicatedFetchList; class RefreshSet; class Cluster; @@ -1165,6 +1166,9 @@ class Context: public ContextData, public std::enable_shared_from_this MovesList & getMovesList(); const MovesList & getMovesList() const; + ExportsList & getExportsList(); + const ExportsList & getExportsList() const; + ReplicatedFetchList & getReplicatedFetchList(); const ReplicatedFetchList & getReplicatedFetchList() const; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 232b6e46309e..c5c870696969 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -455,24 +455,20 @@ class IStorage : public std::enable_shared_from_this, public TypePromo return false; } - struct ImportStats - { - ExecutionStatus status; - std::size_t elapsed_ns = 0; - std::size_t bytes_on_disk = 0; - std::size_t read_rows = 0; - std::size_t read_bytes = 0; - std::string file_path = ""; - }; - + /* +It is currently only implemented in StorageObjectStorage. + It is meant to be used to import merge tree data parts into object storage. It is similar to the write API, + but it won't re-partition the data and should allow the filename to be set by the caller. + */ virtual SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - ContextPtr /* context */, - std::function /* stats_log */) - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName()); - } + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, + ContextPtr /* context */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName()); + } /** Writes the data to a table in distributed manner. diff --git a/src/Storages/MergeTree/ExportList.cpp b/src/Storages/MergeTree/ExportList.cpp new file mode 100644 index 000000000000..63ecc6d7c1f1 --- /dev/null +++ b/src/Storages/MergeTree/ExportList.cpp @@ -0,0 +1,66 @@ +#include + +namespace DB +{ + +ExportsListElement::ExportsListElement( + const StorageID & source_table_id_, + const StorageID & destination_table_id_, + UInt64 part_size_, + const String & part_name_, + const String & target_file_name_, + UInt64 total_rows_to_read_, + UInt64 total_size_bytes_compressed_, + UInt64 total_size_bytes_uncompressed_, + time_t create_time_, + const ContextPtr & context) +: source_table_id(source_table_id_) +, destination_table_id(destination_table_id_) +, part_size(part_size_) +, part_name(part_name_) +, destination_file_path(target_file_name_) +, total_rows_to_read(total_rows_to_read_) +, total_size_bytes_compressed(total_size_bytes_compressed_) +, total_size_bytes_uncompressed(total_size_bytes_uncompressed_) +, create_time(create_time_) +{ + thread_group = ThreadGroup::createForBackgroundProcess(context); +} + +ExportsListElement::~ExportsListElement() +{ + background_memory_tracker.adjustOnBackgroundTaskEnd(&thread_group->memory_tracker); +} + +ExportInfo ExportsListElement::getInfo() const +{ + ExportInfo res; + res.source_database = source_table_id.database_name; + res.source_table = source_table_id.table_name; + res.destination_database = destination_table_id.database_name; + res.destination_table = destination_table_id.table_name; + res.part_name = part_name; + res.destination_file_path = destination_file_path; + res.rows_read = rows_read; + res.total_rows_to_read = total_rows_to_read; + res.total_size_bytes_compressed = total_size_bytes_compressed; + res.total_size_bytes_uncompressed = total_size_bytes_uncompressed; + res.bytes_read_uncompressed = bytes_read_uncompressed; + res.memory_usage = getMemoryUsage(); + res.peak_memory_usage = getPeakMemoryUsage(); + res.create_time = create_time; + res.elapsed = elapsed; + return res; +} + +UInt64 ExportsListElement::getMemoryUsage() const +{ + return thread_group->memory_tracker.get(); +} + +UInt64 ExportsListElement::getPeakMemoryUsage() const +{ + return thread_group->memory_tracker.getPeak(); +} + +} diff --git a/src/Storages/MergeTree/ExportList.h b/src/Storages/MergeTree/ExportList.h new file mode 100644 index 000000000000..ade18b69480c --- /dev/null +++ b/src/Storages/MergeTree/ExportList.h @@ -0,0 +1,90 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace CurrentMetrics +{ + extern const Metric Export; +} + +namespace DB +{ + +struct ExportInfo +{ + String source_database; + String source_table; + String destination_database; + String destination_table; + String part_name; + String destination_file_path; + UInt64 rows_read; + UInt64 total_rows_to_read; + UInt64 total_size_bytes_compressed; + UInt64 total_size_bytes_uncompressed; + UInt64 bytes_read_uncompressed; + UInt64 memory_usage; + UInt64 peak_memory_usage; + time_t create_time = 0; + Float64 elapsed; +}; + +struct ExportsListElement : private boost::noncopyable +{ + const StorageID source_table_id; + const StorageID destination_table_id; + const UInt64 part_size; + const String part_name; + const String destination_file_path; + UInt64 rows_read {0}; + UInt64 total_rows_to_read {0}; + UInt64 total_size_bytes_compressed {0}; + UInt64 total_size_bytes_uncompressed {0}; + UInt64 bytes_read_uncompressed {0}; + time_t create_time {0}; + Float64 elapsed {0}; + + Stopwatch watch; + ThreadGroupPtr thread_group; + + ExportsListElement( + const StorageID & source_table_id_, + const StorageID & destination_table_id_, + UInt64 part_size_, + const String & part_name_, + const String & destination_file_path_, + UInt64 total_rows_to_read_, + UInt64 total_size_bytes_compressed_, + UInt64 total_size_bytes_uncompressed_, + time_t create_time_, + const ContextPtr & context); + + ~ExportsListElement(); + + ExportInfo getInfo() const; + + UInt64 getMemoryUsage() const; + UInt64 getPeakMemoryUsage() const; +}; + + +class ExportsList final : public BackgroundProcessList +{ +private: + using Parent = BackgroundProcessList; + +public: + ExportsList() + : Parent(CurrentMetrics::Export) + {} +}; + +using ExportsListEntry = BackgroundProcessListEntry; + +} diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 5c55cfcde8d4..2faebe8e8a7b 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include #include @@ -118,6 +119,7 @@ #include #include #include +#include #include #include @@ -160,6 +162,10 @@ namespace ProfileEvents extern const Event LoadedDataPartsMicroseconds; extern const Event RestorePartsSkippedFiles; extern const Event RestorePartsSkippedBytes; + extern const Event PartsExports; + extern const Event PartsExportTotalMilliseconds; + extern const Event PartsExportFailures; + extern const Event PartsExportDuplicated; } namespace CurrentMetrics @@ -205,6 +211,8 @@ namespace Setting extern const SettingsBool apply_patch_parts; extern const SettingsBool allow_experimental_export_merge_tree_part; extern const SettingsUInt64 min_bytes_to_use_direct_io; + extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists; + extern const SettingsBool output_format_parallel_formatting; } namespace MergeTreeSetting @@ -323,6 +331,7 @@ namespace ErrorCodes extern const int DATA_TYPE_CANNOT_BE_USED_IN_KEY; extern const int TOO_LARGE_LIGHTWEIGHT_UPDATES; extern const int UNKNOWN_TABLE; + extern const int FILE_ALREADY_EXISTS; } static void checkSuspiciousIndices(const ASTFunction * index_function) @@ -6231,9 +6240,15 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP part_name, getStorageID().getFullTableName()); { + MergeTreeExportManifest manifest( + dest_storage->getStorageID(), + part, + query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], + query_context->getSettingsRef()[Setting::output_format_parallel_formatting]); + std::lock_guard lock(export_manifests_mutex); - if (!export_manifests.emplace(dest_storage->getStorageID(), part).second) + if (!export_manifests.emplace(std::move(manifest)).second) { throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'", part_name, dest_storage->getStorageID().getFullTableName()); @@ -6247,24 +6262,6 @@ void MergeTreeData::exportPartToTableImpl( const MergeTreeExportManifest & manifest, ContextPtr local_context) { - std::function part_log_wrapper = [this, manifest](ImportStats stats) { - const auto & data_part = manifest.data_part; - - writePartLog( - PartLogElement::Type::EXPORT_PART, - stats.status, - stats.elapsed_ns, - data_part->name, - data_part, - {data_part}, - nullptr, - nullptr); - - std::lock_guard inner_lock(export_manifests_mutex); - - export_manifests.erase(manifest); - }; - auto metadata_snapshot = getInMemoryMetadataPtr(); Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); StorageSnapshotPtr storage_snapshot = getStorageSnapshot(metadata_snapshot, local_context); @@ -6291,17 +6288,31 @@ void MergeTreeData::exportPartToTableImpl( throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name); } - auto sink = destination_storage->import( - manifest.data_part->name, - block_with_partition_values, - local_context, - part_log_wrapper); + SinkToStoragePtr sink; + std::string destination_file_path; - /// Most likely the file has already been imported, so we can just return - if (!sink) + try { - std::lock_guard inner_lock(export_manifests_mutex); + auto context_copy = Context::createCopy(local_context); + context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); + sink = destination_storage->import( + manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), + block_with_partition_values, + destination_file_path, + manifest.overwrite_file_if_exists, + context_copy); + } + catch (const Exception & e) + { + if (e.code() == ErrorCodes::FILE_ALREADY_EXISTS) + { + ProfileEvents::increment(ProfileEvents::PartsExportDuplicated); + } + + ProfileEvents::increment(ProfileEvents::PartsExportFailures); + + std::lock_guard inner_lock(export_manifests_mutex); export_manifests.erase(manifest); return; } @@ -6342,15 +6353,79 @@ void MergeTreeData::exportPartToTableImpl( local_context, getLogger("ExportPartition")); + auto exports_list_entry = getContext()->getExportsList().insert( + getStorageID(), + manifest.destination_storage_id, + manifest.data_part->getBytesOnDisk(), + manifest.data_part->name, + destination_file_path, + manifest.data_part->rows_count, + manifest.data_part->getBytesOnDisk(), + manifest.data_part->getBytesUncompressedOnDisk(), + manifest.create_time, + local_context); + + ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ""); + QueryPlanOptimizationSettings optimization_settings(local_context); auto pipeline_settings = BuildQueryPipelineSettings(local_context); auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); + + builder->setProgressCallback([&exports_list_entry](const Progress & progress) + { + (*exports_list_entry)->bytes_read_uncompressed += progress.read_bytes; + (*exports_list_entry)->rows_read += progress.read_rows; + (*exports_list_entry)->elapsed = (*exports_list_entry)->watch.elapsedSeconds(); + }); + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); pipeline.complete(sink); - CompletedPipelineExecutor exec(pipeline); - exec.execute(); + try + { + CompletedPipelineExecutor exec(pipeline); + exec.execute(); + + std::lock_guard inner_lock(export_manifests_mutex); + writePartLog( + PartLogElement::Type::EXPORT_PART, + {}, + static_cast((*exports_list_entry)->elapsed * 1000000000), + manifest.data_part->name, + manifest.data_part, + {manifest.data_part}, + nullptr, + nullptr, + exports_list_entry.get()); + + export_manifests.erase(manifest); + + ProfileEvents::increment(ProfileEvents::PartsExports); + ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast((*exports_list_entry)->elapsed * 1000)); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while exporting the part {}. User should retry.", manifest.data_part->name)); + + ProfileEvents::increment(ProfileEvents::PartsExportFailures); + + std::lock_guard inner_lock(export_manifests_mutex); + writePartLog( + PartLogElement::Type::EXPORT_PART, + ExecutionStatus::fromCurrentException("", true), + static_cast((*exports_list_entry)->elapsed * 1000000000), + manifest.data_part->name, + manifest.data_part, + {manifest.data_part}, + nullptr, + nullptr, + exports_list_entry.get()); + + export_manifests.erase(manifest); + + throw; + } } void MergeTreeData::movePartitionToShard(const ASTPtr & /*partition*/, bool /*move_part*/, const String & /*to*/, ContextPtr /*query_context*/) @@ -9043,7 +9118,8 @@ void MergeTreeData::writePartLog( const DataPartPtr & result_part, const DataPartsVector & source_parts, const MergeListEntry * merge_entry, - std::shared_ptr profile_counters) + std::shared_ptr profile_counters, + const ExportsListEntry * exports_entry) try { auto table_id = getStorageID(); @@ -9111,6 +9187,13 @@ try part_log_elem.rows = (*merge_entry)->rows_written; part_log_elem.peak_memory_usage = (*merge_entry)->getMemoryTracker().getPeak(); } + else if (exports_entry) + { + part_log_elem.rows_read = (*exports_entry)->rows_read; + part_log_elem.bytes_read_uncompressed = (*exports_entry)->bytes_read_uncompressed; + part_log_elem.peak_memory_usage = (*exports_entry)->getPeakMemoryUsage(); + part_log_elem.path_on_disk = (*exports_entry)->destination_file_path; + } if (profile_counters) { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 6a8a7fc55fc6..7072b8d52e82 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -1629,7 +1630,8 @@ class MergeTreeData : public IStorage, public WithMutableContext const DataPartPtr & result_part, const DataPartsVector & source_parts, const MergeListEntry * merge_entry, - std::shared_ptr profile_counters); + std::shared_ptr profile_counters, + const ExportsListEntry * exports_entry = nullptr); /// If part is assigned to merge or mutation (possibly replicated) /// Should be overridden by children, because they can have different diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h index 89aed701f993..36831fd132ba 100644 --- a/src/Storages/MergeTree/MergeTreeExportManifest.h +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -8,9 +8,24 @@ struct MergeTreeExportManifest { using DataPartPtr = std::shared_ptr; + + MergeTreeExportManifest( + const StorageID & destination_storage_id_, + const DataPartPtr & data_part_, + bool overwrite_file_if_exists_, + bool parallel_formatting_) + : destination_storage_id(destination_storage_id_), + data_part(data_part_), + overwrite_file_if_exists(overwrite_file_if_exists_), + parallel_formatting(parallel_formatting_), + create_time(time(nullptr)) {} + StorageID destination_storage_id; DataPartPtr data_part; - time_t create_time = time(nullptr); + bool overwrite_file_if_exists; + bool parallel_formatting; + + time_t create_time; mutable bool in_progress = false; bool operator<(const MergeTreeExportManifest & rhs) const diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp deleted file mode 100644 index 82a1bbfc81ea..000000000000 --- a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp +++ /dev/null @@ -1,65 +0,0 @@ - -#include - -namespace DB -{ - -StorageObjectStorageImporterSink::StorageObjectStorageImporterSink( - const std::string & path_, - const ObjectStoragePtr & object_storage_, - const ConfigurationPtr & configuration_, - const std::optional & format_settings_, - const Block & sample_block_, - const std::function & part_log_, - const ContextPtr & context_) - : SinkToStorage(sample_block_) - , object_storage(object_storage_) - , configuration(configuration_) - , format_settings(format_settings_) - , sample_block(sample_block_) - , context(context_) - , part_log(part_log_) -{ - stats.file_path = path_; - sink = std::make_shared( - stats.file_path, - object_storage, - configuration, - format_settings, - sample_block, - context); -} - -String StorageObjectStorageImporterSink::getName() const -{ - return "StorageObjectStorageMergeTreePartImporterSink"; -} - -void StorageObjectStorageImporterSink::consume(Chunk & chunk) -{ - sink->consume(chunk); - stats.read_bytes += chunk.bytes(); - stats.read_rows += chunk.getNumRows(); -} - -void StorageObjectStorageImporterSink::onFinish() -{ - sink->onFinish(); - - if (const auto object_metadata = object_storage->tryGetObjectMetadata(stats.file_path)) - { - stats.bytes_on_disk = object_metadata->size_bytes; - } - - part_log(stats); -} - -void StorageObjectStorageImporterSink::onException(std::exception_ptr exception) -{ - sink->onException(exception); - - stats.status = ExecutionStatus(-1, "Error importing part"); - part_log(stats); -} - -} diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h deleted file mode 100644 index 051f5196f964..000000000000 --- a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -using DataPartPtr = std::shared_ptr; - -/* - * Wrapper around `StorageObjectsStorageSink` that takes care of accounting & metrics for partition export - */ -class StorageObjectStorageImporterSink : public SinkToStorage -{ -public: - using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; - - StorageObjectStorageImporterSink( - const std::string & path_, - const ObjectStoragePtr & object_storage_, - const ConfigurationPtr & configuration_, - const std::optional & format_settings_, - const Block & sample_block_, - const std::function & part_log_, - const ContextPtr & context_); - - String getName() const override; - - void consume(Chunk & chunk) override; - - void onFinish() override; - - void onException(std::exception_ptr exception) override; - -private: - std::shared_ptr sink; - ObjectStoragePtr object_storage; - ConfigurationPtr configuration; - std::optional format_settings; - Block sample_block; - ContextPtr context; - std::function part_log; - - IStorage::ImportStats stats; -}; - -} diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index ee780987f98e..393b29e7dad9 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -33,8 +33,6 @@ #include #include #include -#include -#include #include #include #include @@ -57,6 +55,7 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; extern const int LOGICAL_ERROR; extern const int INCORRECT_DATA; + extern const int FILE_ALREADY_EXISTS; } String StorageObjectStorage::getPathSample(ContextPtr context) @@ -487,8 +486,9 @@ bool StorageObjectStorage::supportsImport() const SinkToStoragePtr StorageObjectStorage::import( const std::string & file_name, Block & block_with_partition_values, - ContextPtr local_context, - std::function part_log) + std::string & destination_file_path, + bool overwrite_if_exists, + ContextPtr local_context) { std::string partition_key; @@ -502,23 +502,20 @@ SinkToStoragePtr StorageObjectStorage::import( } } - const auto file_path = configuration->getPathForWrite(partition_key, file_name).path; + destination_file_path = configuration->getPathForWrite(partition_key, file_name).path; - if (object_storage->exists(StoredObject(file_path))) + if (!overwrite_if_exists && object_storage->exists(StoredObject(destination_file_path))) { - LOG_INFO(getLogger("StorageObjectStorage"), "File {} already exists, skipping import", file_path); - return nullptr; + throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "File {} already exists", destination_file_path); } - - return std::make_shared( - file_path, + + return std::make_shared( + destination_file_path, object_storage, configuration, format_settings, getInMemoryMetadataPtr()->getSampleBlock(), - part_log, - local_context - ); + local_context); } void StorageObjectStorage::truncate( diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 4bc7c3e283b6..d99066f68b07 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -83,8 +83,9 @@ class StorageObjectStorage : public IStorage SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - ContextPtr /* context */, - std::function /* part_log */) override; + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, + ContextPtr /* context */) override; void truncate( const ASTPtr & query, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 415ee23d923f..3c586d53d419 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -256,13 +256,14 @@ bool StorageObjectStorageCluster::supportsImport() const SinkToStoragePtr StorageObjectStorageCluster::import( const std::string & file_name, Block & block_with_partition_values, - ContextPtr context, - std::function part_log) + std::string & destination_file_path, + bool overwrite_if_exists, + ContextPtr context) { if (pure_storage) - return pure_storage->import(file_name, block_with_partition_values, context, part_log); + return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context); - return IStorageCluster::import(file_name, block_with_partition_values, context, part_log); + return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 9a35791a6552..8a36235b99d1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -38,8 +38,9 @@ class StorageObjectStorageCluster : public IStorageCluster SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - ContextPtr /* context */, - std::function /* part_log */) override; + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, + ContextPtr /* context */) override; private: void updateQueryToSendIfNeeded( diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp index 8a25b4188426..979fa21708d6 100644 --- a/src/Storages/System/StorageSystemExports.cpp +++ b/src/Storages/System/StorageSystemExports.cpp @@ -1,11 +1,9 @@ +#include #include #include -#include -#include -#include "Columns/ColumnString.h" -#include "DataTypes/DataTypeString.h" -#include -#include "Storages/VirtualColumnUtils.h" +#include +#include +#include #include #include #include @@ -23,95 +21,45 @@ ColumnsDescription StorageSystemExports::getColumnsDescription() {"destination_database", std::make_shared(), "Name of the destination database."}, {"destination_table", std::make_shared(), "Name of the destination table."}, {"create_time", std::make_shared(), "Date and time when the export command was submitted for execution."}, - {"part_name", std::make_shared(), "Name of the part"} + {"part_name", std::make_shared(), "Name of the part"}, + {"destination_file_path", std::make_shared(), "File path where the part is being exported."}, + {"elapsed", std::make_shared(), "The time elapsed (in seconds) since the export started."}, + {"rows_read", std::make_shared(), "The number of rows read from the exported part."}, + {"total_rows_to_read", std::make_shared(), "The total number of rows to read from the exported part."}, + {"total_size_bytes_compressed", std::make_shared(), "The total size of the compressed data in the exported part."}, + {"total_size_bytes_uncompressed", std::make_shared(), "The total size of the uncompressed data in the exported part."}, + {"bytes_read_uncompressed", std::make_shared(), "The number of uncompressed bytes read from the exported part."}, + {"memory_usage", std::make_shared(), "Current memory usage in bytes for the export operation."}, + {"peak_memory_usage", std::make_shared(), "Peak memory usage in bytes during the export operation."}, }; } -void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector) const +void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector) const { const auto access = context->getAccess(); - const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES); - - /// Collect a set of *MergeTree tables. - std::map> merge_tree_tables; - for (const auto & db : DatabaseCatalog::instance().getDatabases()) - { - /// Check if database can contain MergeTree tables - if (!db.second->canContainMergeTreeTables()) - continue; - - const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); - - for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) - { - const auto & table = iterator->table(); - if (!table) - continue; + const bool check_access_for_tables = !access->isGranted(AccessType::SHOW_TABLES); - if (!dynamic_cast(table.get())) - continue; - - if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) - continue; - - merge_tree_tables[db.first][iterator->name()] = table; - } - } - - MutableColumnPtr col_source_database_export = ColumnString::create(); - MutableColumnPtr col_source_table_export = ColumnString::create(); - - for (auto & db : merge_tree_tables) + for (const auto & export_info : context->getExportsList().get()) { - for (auto & table : db.second) - { - col_source_database_export->insert(db.first); - col_source_table_export->insert(table.first); - } - } - - ColumnPtr col_source_database = std::move(col_source_database_export); - ColumnPtr col_source_table = std::move(col_source_table_export); - - /// Determine what tables are needed by the conditions in the query. - { - Block filtered_block - { - { col_source_database, std::make_shared(), "source_database" }, - { col_source_table, std::make_shared(), "source_table" }, - }; - - VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context); - - if (!filtered_block.rows()) - return; - - col_source_database = filtered_block.getByName("source_database").column; - col_source_table = filtered_block.getByName("source_table").column; - } - - for (size_t i_storage = 0; i_storage < col_source_database->size(); ++i_storage) - { - auto database = (*col_source_database)[i_storage].safeGet(); - auto table = (*col_source_table)[i_storage].safeGet(); - - std::vector statuses; - { - const IStorage * storage = merge_tree_tables[database][table].get(); - if (const auto * merge_tree = dynamic_cast(storage)) - statuses = merge_tree->getExportsStatus(); - } + if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, export_info.source_database, export_info.source_table)) + continue; - for (const MergeTreeExportStatus & status : statuses) - { - size_t col_num = 0; - res_columns[col_num++]->insert(status.source_database); - res_columns[col_num++]->insert(status.source_table); - res_columns[col_num++]->insert(status.destination_database); - res_columns[col_num++]->insert(status.destination_table); - res_columns[col_num++]->insert(status.create_time); - res_columns[col_num++]->insert(status.part_name); - } + size_t i = 0; + res_columns[i++]->insert(export_info.source_database); + res_columns[i++]->insert(export_info.source_table); + res_columns[i++]->insert(export_info.destination_database); + res_columns[i++]->insert(export_info.destination_table); + res_columns[i++]->insert(export_info.create_time); + res_columns[i++]->insert(export_info.part_name); + res_columns[i++]->insert(export_info.destination_file_path); + res_columns[i++]->insert(export_info.elapsed); + res_columns[i++]->insert(export_info.rows_read); + res_columns[i++]->insert(export_info.total_rows_to_read); + res_columns[i++]->insert(export_info.total_size_bytes_compressed); + res_columns[i++]->insert(export_info.total_size_bytes_uncompressed); + res_columns[i++]->insert(export_info.bytes_read_uncompressed); + res_columns[i++]->insert(export_info.memory_usage); + res_columns[i++]->insert(export_info.peak_memory_usage); } } diff --git a/src/Storages/System/StorageSystemMerges.cpp b/src/Storages/System/StorageSystemMerges.cpp index 0fca5dc84a2b..c8c569ff4696 100644 --- a/src/Storages/System/StorageSystemMerges.cpp +++ b/src/Storages/System/StorageSystemMerges.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include #include diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index e8abd22b97d1..5a3a4d30599d 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -209,6 +210,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b attach(context, system_database, "dimensional_metrics", "Contains dimensional metrics, which have multiple dimensions (labels) to provide more granular information. For example, counting failed merges by their error code. This table is always up to date."); attach(context, system_database, "merges", "Contains a list of merges currently executing merges of MergeTree tables and their progress. Each merge operation is represented by a single row."); attach(context, system_database, "moves", "Contains information about in-progress data part moves of MergeTree tables. Each data part movement is represented by a single row."); + attach(context, system_database, "exports", "Contains a list of exports currently executing exports of MergeTree tables and their progress. Each export operation is represented by a single row."); attach(context, system_database, "mutations", "Contains a list of mutations and their progress. Each mutation command is represented by a single row."); attachNoDescription(context, system_database, "replicas", "Contains information and status of all table replicas on current server. Each replica is represented by a single row."); attach(context, system_database, "replication_queue", "Contains information about tasks from replication queues stored in ClickHouse Keeper, or ZooKeeper, for each table replica."); diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference index 75c32111f7a3..d9089d37dd99 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference @@ -1,14 +1,14 @@ ---- Export 2020_1_1_0 and 2021_2_2_0 ---- Both data parts should appear -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 1 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 2 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 3 -test/s3_table_NAME/year=2021/2021_2_2_0.parquet 4 +1 2020 +2 2020 +3 2020 +4 2021 ---- Export the same part again, it should be idempotent -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 1 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 2 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 3 -test/s3_table_NAME/year=2021/2021_2_2_0.parquet 4 +1 2020 +2 2020 +3 2020 +4 2021 ---- Data in roundtrip MergeTree table (should match s3_table) 1 2020 2 2020 diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh index 5e886eaf4755..7b3e9d3bb3d1 100755 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh @@ -23,16 +23,16 @@ query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTING query "ALTER TABLE $mt_table EXPORT PART '2021_2_2_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" echo "---- Both data parts should appear" -query "SELECT DISTINCT ON (id) replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), id FROM $s3_table ORDER BY id" +query "SELECT * FROM $s3_table ORDER BY id" echo "---- Export the same part again, it should be idempotent" query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" -query "SELECT DISTINCT ON (id) replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), id FROM $s3_table ORDER BY id" +query "SELECT * FROM $s3_table ORDER BY id" query "CREATE TABLE $mt_table_roundtrip ENGINE = MergeTree() PARTITION BY year ORDER BY tuple() AS SELECT * FROM $s3_table" echo "---- Data in roundtrip MergeTree table (should match s3_table)" -query "SELECT DISTINCT ON (id) * FROM $mt_table_roundtrip ORDER BY id" +query "SELECT * FROM $s3_table ORDER BY id" query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip" From ea5a8ab34b7fe8646fb52006b6fcb08a403dfb4c Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 29 Sep 2025 12:36:20 -0300 Subject: [PATCH 04/14] try to fix build --- src/Storages/ObjectStorage/StorageObjectStorage.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 393b29e7dad9..2b82f96cc676 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -689,9 +689,5 @@ void StorageObjectStorage::checkAlterIsPossible(const AlterCommands & commands, configuration->checkAlterIsPossible(commands); } -StorageObjectStorage::Configuration::Path StorageObjectStorage::Configuration::getPathForWrite(const std::string & partition_id, const std::string & filename_override) const -{ - return Path {file_path_generator->getPathForWrite(partition_id, filename_override)}; } -} From b9ca655364b24ea2665c2f16cd03390ba42e0bbc Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 29 Sep 2025 12:37:14 -0300 Subject: [PATCH 05/14] try to fix build 2 --- .../ObjectStorage/StorageObjectStorageConfiguration.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp index 4951aa317e3e..23af37a07c5d 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp @@ -117,7 +117,7 @@ void StorageObjectStorageConfiguration::initialize( else FormatFactory::instance().checkFormatName(configuration_to_initialize.format); - configuration_to_initialize.read_path = file_path_generator->getPathForRead(); + configuration_to_initialize.read_path = configuration_to_initialize.file_path_generator->getPathForRead(); configuration_to_initialize.initialized = true; } From 0a31eabff35d67c4431466d0825f0fe67fcf7167 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 13 Oct 2025 14:06:53 -0300 Subject: [PATCH 06/14] Update ASTAlterQuery.cpp --- src/Parsers/ASTAlterQuery.cpp | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index 7006e1bdb8f7..30b5f9dca156 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -360,8 +360,7 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett } else if (type == ASTAlterCommand::EXPORT_PART) { - ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << "PART" - << (settings.hilite ? hilite_none : ""); + ostr << "EXPORT PART "; partition->format(ostr, settings, state, frame); ostr << " TO "; switch (move_destination_type) @@ -370,11 +369,9 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett ostr << "TABLE "; if (!to_database.empty()) { - ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database) - << (settings.hilite ? hilite_none : "") << "."; + ostr << backQuoteIfNeed(to_database) << "."; } - ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table) - << (settings.hilite ? hilite_none : ""); + ostr << backQuoteIfNeed(to_table); return; default: break; From 2df9bbbb2f1c3123214a595fccad866d089f61ad Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 13 Oct 2025 15:01:13 -0300 Subject: [PATCH 07/14] make some fields public --- .../ObjectStorage/StorageObjectStorageConfiguration.h | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h index aeb2db971b35..ec9c9c10b9ab 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.h @@ -274,15 +274,16 @@ class StorageObjectStorageConfiguration return false; } -private: - String format = "auto"; - String compression_method = "auto"; - String structure = "auto"; PartitionStrategyFactory::StrategyType partition_strategy_type = PartitionStrategyFactory::StrategyType::NONE; + std::shared_ptr partition_strategy; /// Whether partition column values are contained in the actual data. /// And alternative is with hive partitioning, when they are contained in file path. bool partition_columns_in_data_file = true; - std::shared_ptr partition_strategy; + +private: + String format = "auto"; + String compression_method = "auto"; + String structure = "auto"; protected: bool initialized = false; From f0124217600d545c587bea85fa9b79f19e549601 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 13 Oct 2025 21:48:00 -0300 Subject: [PATCH 08/14] partially fix build issue --- .../ObjectStorage/StorageObjectStorage.cpp | 2 +- .../StorageObjectStorageConfiguration.cpp | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 385bb9730f50..7e02b2a1add8 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -501,7 +501,7 @@ SinkToStoragePtr StorageObjectStorage::import( object_storage, configuration, format_settings, - getInMemoryMetadataPtr()->getSampleBlock(), + std::make_shared(getInMemoryMetadataPtr()->getSampleBlock()), local_context); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp index 7db31fde0af1..967c9b0525a1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp @@ -88,15 +88,15 @@ void StorageObjectStorageConfiguration::initialize( } } - if (configuration_to_initialize.partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE) + if (partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE) { - configuration_to_initialize.file_path_generator = std::make_shared( - configuration_to_initialize.getRawPath().path, - configuration_to_initialize.format); + file_path_generator = std::make_shared( + getRawPath().path, + format); } else { - configuration_to_initialize.file_path_generator = std::make_shared(configuration_to_initialize.getRawPath().path); + file_path_generator = std::make_shared(getRawPath().path); } if (format == "auto") @@ -116,7 +116,7 @@ void StorageObjectStorageConfiguration::initialize( else FormatFactory::instance().checkFormatName(format); - configuration_to_initialize.read_path = configuration_to_initialize.file_path_generator->getPathForRead(); + read_path = file_path_generator->getPathForRead(); initialized = true; } @@ -149,7 +149,7 @@ StorageObjectStorageConfiguration::Path StorageObjectStorageConfiguration::getPa return getPathForWrite(partition_id, /* filename_override */ ""); } -StorageObjectStorage::Configuration::Path StorageObjectStorage::Configuration::getPathForWrite(const std::string & partition_id, const std::string & filename_override) const +StorageObjectStorageConfiguration::Path StorageObjectStorageConfiguration::getPathForWrite(const std::string & partition_id, const std::string & filename_override) const { return Path {file_path_generator->getPathForWrite(partition_id, filename_override)}; } From 59800647b4c175a2ec00b2b00a7ff4e2fc4137f4 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 14 Oct 2025 10:04:04 -0300 Subject: [PATCH 09/14] one more change --- src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp index 967c9b0525a1..f030b31c6a3e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageConfiguration.cpp @@ -134,7 +134,6 @@ void StorageObjectStorageConfiguration::initPartitionStrategy(ASTPtr partition_b if (partition_strategy) { - read_path = partition_strategy->getPathForRead(getRawPath().path); LOG_DEBUG(getLogger("StorageObjectStorageConfiguration"), "Initialized partition strategy {}", magic_enum::enum_name(partition_strategy_type)); } } From 4230781cca9c1cb4dae2b296d00e6aa88150448d Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 14 Oct 2025 10:41:13 -0300 Subject: [PATCH 10/14] one more fix --- src/Interpreters/Context.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 54f6b5d38cb3..c9f6e9ad84b6 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -158,6 +158,8 @@ namespace ProfileEvents extern const Event BackupThrottlerSleepMicroseconds; extern const Event MergesThrottlerBytes; extern const Event MergesThrottlerSleepMicroseconds; + extern const Event ExportsThrottlerBytes; + extern const Event ExportsThrottlerSleepMicroseconds; extern const Event MutationsThrottlerBytes; extern const Event MutationsThrottlerSleepMicroseconds; extern const Event QueryLocalReadThrottlerBytes; From 99781c6564e84f7aeafce29c789a2152fde14f9d Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 14 Oct 2025 11:25:37 -0300 Subject: [PATCH 11/14] some more fix attempts --- src/Storages/StorageFile.cpp | 2 +- src/Storages/StorageURL.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index a8d29ff96365..28e9da937032 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -2111,7 +2111,7 @@ SinkToStoragePtr StorageFile::write( partition_strategy, sink_creator, context, - metadata_snapshot->getSampleBlock() + std::make_shared(metadata_snapshot->getSampleBlock()) ); } diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 4f45c220cb11..5c80831421f4 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -1459,7 +1459,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad headers, http_method); - return std::make_shared(partition_strategy, sink_creator, context, metadata_snapshot->getSampleBlock()); + return std::make_shared(partition_strategy, sink_creator, context, std::make_shared(metadata_snapshot->getSampleBlock())); } return std::make_shared( From 08cec669e4231a67424bbab8139154745a23efac Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 14 Oct 2025 11:41:23 -0300 Subject: [PATCH 12/14] updt --- src/Storages/MergeTree/ExportList.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ExportList.cpp b/src/Storages/MergeTree/ExportList.cpp index 63ecc6d7c1f1..0239f841dc69 100644 --- a/src/Storages/MergeTree/ExportList.cpp +++ b/src/Storages/MergeTree/ExportList.cpp @@ -24,7 +24,7 @@ ExportsListElement::ExportsListElement( , total_size_bytes_uncompressed(total_size_bytes_uncompressed_) , create_time(create_time_) { - thread_group = ThreadGroup::createForBackgroundProcess(context); + thread_group = ThreadGroup::createForMergeMutate(context); } ExportsListElement::~ExportsListElement() From 0828fe9df2b28c66468323adb85eeadeebb4e9ba Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 14 Oct 2025 13:21:49 -0300 Subject: [PATCH 13/14] settings change histr --- src/Core/SettingsChangesHistory.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index a1c90c2ff58a..9d210f7ad16d 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -47,6 +47,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, {"allow_retries_in_cluster_requests", false, false, "New setting"}, + {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, + {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.8", { From 4e10deafd24ea7749fd0dc3a1105374a5502e7f2 Mon Sep 17 00:00:00 2001 From: Vasily Nemkov Date: Tue, 14 Oct 2025 23:57:37 +0200 Subject: [PATCH 14/14] Excluded export tests from fast test run --- .../03572_export_merge_tree_part_to_object_storage.sh | 1 + .../03572_export_merge_tree_part_to_object_storage_simple.sql | 2 +- ...03572_export_replicated_merge_tree_part_to_object_storage.sh | 2 +- ...port_replicated_merge_tree_part_to_object_storage_simple.sql | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh index 7b3e9d3bb3d1..1b7efb2d0379 100755 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh @@ -1,4 +1,5 @@ #!/usr/bin/env bash +# Tags: no-fasttest CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql index 136b12142383..a61c066e8789 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql @@ -1,4 +1,4 @@ --- Tags: no-parallel +-- Tags: no-parallel, no-fasttest DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table; diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh index 3b955d4bbbe5..4709faf6c2e4 100755 --- a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh @@ -1,5 +1,5 @@ #!/usr/bin/env bash -# Tags: replica, no-parallel, no-replicated-database +# Tags: replica, no-parallel, no-replicated-database, no-fasttest CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql index e0aa82532190..f8f23532f0a7 100644 --- a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql @@ -1,4 +1,4 @@ --- Tags: no-parallel +-- Tags: no-parallel, no-fasttest DROP TABLE IF EXISTS 03572_rmt_table, 03572_invalid_schema_table;