diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index ccdc21f9cbce..de3f2e18136b 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -74,6 +74,7 @@ enum class AccessType : uint8_t enabled implicitly by the grant ALTER_TABLE */\ M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING, RESET SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\ M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \ + M(ALTER_EXPORT_PART, "ALTER EXPORT PART, EXPORT PART", TABLE, ALTER_TABLE) \ M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \ M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \ M(ALTER_UNLOCK_SNAPSHOT, "UNLOCK SNAPSHOT", TABLE, ALTER_TABLE) \ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 92a949f28009..c0713ff55f76 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -138,6 +138,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/Azure) add_headers_and_sources(dbms Storages/ObjectStorage/S3) add_headers_and_sources(dbms Storages/ObjectStorage/HDFS) add_headers_and_sources(dbms Storages/ObjectStorage/Local) +add_headers_and_sources(dbms Storages/ObjectStorage/MergeTree) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg) add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/DeltaLake) diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index c73cbfce17f2..77eadea11873 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -99,6 +99,7 @@ namespace DB DECLARE(UInt64, max_unexpected_parts_loading_thread_pool_size, 8, R"(The number of threads to load inactive set of data parts (Unexpected ones) at startup.)", 0) \ DECLARE(UInt64, max_parts_cleaning_thread_pool_size, 128, R"(The number of threads for concurrent removal of inactive data parts.)", 0) \ DECLARE(UInt64, max_mutations_bandwidth_for_server, 0, R"(The maximum read speed of all mutations on server in bytes per second. Zero means unlimited.)", 0) \ + DECLARE(UInt64, max_exports_bandwidth_for_server, 0, R"(The maximum read speed of all exports on server in bytes per second. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_merges_bandwidth_for_server, 0, R"(The maximum read speed of all merges on server in bytes per second. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.)", 0) \ DECLARE(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, R"(The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.)", 0) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 76fdaa95c68d..1d3ffebafe75 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6895,6 +6895,9 @@ Execute request to object storage as remote on one of object_storage_cluster nod DECLARE_WITH_ALIAS(Bool, allow_experimental_time_series_aggregate_functions, false, R"( Experimental timeSeries* aggregate functions for Prometheus-like timeseries resampling, rate, delta calculation. )", EXPERIMENTAL, allow_experimental_ts_to_grid_aggregate_function) \ + DECLARE_WITH_ALIAS(Bool, allow_experimental_export_merge_tree_part, false, R"( +Experimental export merge tree part. +)", EXPERIMENTAL, allow_experimental_export_merge_tree_part) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 85a3965af357..d62fdc92bf5d 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -77,6 +77,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster", "", "", "New setting"}, {"object_storage_max_nodes", 0, 0, "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, + {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 5f876807fa10..d115835dd28b 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -2110,7 +2110,8 @@ bool DatabaseReplicated::shouldReplicateQuery(const ContextPtr & query_context, if (const auto * alter = query_ptr->as()) { if (alter->isAttachAlter() || alter->isFetchAlter() || alter->isDropPartitionAlter() - || is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot()) + || is_keeper_map_table(query_ptr) || alter->isFreezeAlter() || alter->isUnlockSnapshot() + || alter->isExportPartAlter()) return false; if (has_many_shards() || !is_replicated_table(query_ptr)) diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index e4b1d48771d4..12f041234c46 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -138,6 +138,8 @@ struct RelativePathWithMetadata virtual ~RelativePathWithMetadata() = default; virtual std::string getFileName() const { return std::filesystem::path(relative_path).filename(); } + virtual std::string getFileNameWithoutExtension() const { return std::filesystem::path(relative_path).stem(); } + virtual std::string getPath() const { return relative_path; } virtual bool isArchive() const { return false; } virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index a9b02aa69622..6bc374c48c1e 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -289,6 +289,7 @@ namespace ServerSetting extern const ServerSettingsUInt64 max_local_write_bandwidth_for_server; extern const ServerSettingsUInt64 max_merges_bandwidth_for_server; extern const ServerSettingsUInt64 max_mutations_bandwidth_for_server; + extern const ServerSettingsUInt64 max_exports_bandwidth_for_server; extern const ServerSettingsUInt64 max_remote_read_network_bandwidth_for_server; extern const ServerSettingsUInt64 max_remote_write_network_bandwidth_for_server; extern const ServerSettingsUInt64 max_replicated_fetches_network_bandwidth_for_server; @@ -505,6 +506,8 @@ struct ContextSharedPart : boost::noncopyable mutable ThrottlerPtr mutations_throttler; /// A server-wide throttler for mutations mutable ThrottlerPtr merges_throttler; /// A server-wide throttler for merges + mutable ThrottlerPtr exports_throttler; /// A server-wide throttler for exports + MultiVersion macros; /// Substitutions extracted from config. std::unique_ptr ddl_worker TSA_GUARDED_BY(mutex); /// Process ddl commands from zk. LoadTaskPtr ddl_worker_startup_task; /// To postpone `ddl_worker->startup()` after all tables startup @@ -996,6 +999,9 @@ struct ContextSharedPart : boost::noncopyable if (auto bandwidth = server_settings[ServerSetting::max_merges_bandwidth_for_server]) merges_throttler = std::make_shared(bandwidth); + + if (auto bandwidth = server_settings[ServerSetting::max_exports_bandwidth_for_server]) + exports_throttler = std::make_shared(bandwidth); } }; @@ -4048,6 +4054,11 @@ ThrottlerPtr Context::getMergesThrottler() const return shared->merges_throttler; } +ThrottlerPtr Context::getExportsThrottler() const +{ + return shared->exports_throttler; +} + void Context::reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const { if (read_bandwidth) diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index bc97a6d03c02..14c61ca2f176 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -1646,6 +1646,7 @@ class Context: public ContextData, public std::enable_shared_from_this ThrottlerPtr getMutationsThrottler() const; ThrottlerPtr getMergesThrottler() const; + ThrottlerPtr getExportsThrottler() const; void reloadRemoteThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const; void reloadLocalThrottlerConfig(size_t read_bandwidth, size_t write_bandwidth) const; diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index ac50addfedf4..0ce10e0b385b 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -747,7 +747,8 @@ bool DDLWorker::taskShouldBeExecutedOnLeader(const ASTPtr & ast_ddl, const Stora alter->isFreezeAlter() || alter->isUnlockSnapshot() || alter->isMovePartitionToDiskOrVolumeAlter() || - alter->isCommentAlter()) + alter->isCommentAlter() || + alter->isExportPartAlter()) return false; } diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 262b4ce13cfd..83a8691cf4ee 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -502,6 +502,12 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS required_access.emplace_back(AccessType::ALTER_DELETE | AccessType::INSERT, database, table); break; } + case ASTAlterCommand::EXPORT_PART: + { + required_access.emplace_back(AccessType::ALTER_EXPORT_PART, database, table); + required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table); + break; + } case ASTAlterCommand::FETCH_PARTITION: { required_access.emplace_back(AccessType::ALTER_FETCH_PARTITION, database, table); diff --git a/src/Interpreters/PartLog.cpp b/src/Interpreters/PartLog.cpp index 02c6f6e573b0..aca3b4cf6870 100644 --- a/src/Interpreters/PartLog.cpp +++ b/src/Interpreters/PartLog.cpp @@ -69,6 +69,7 @@ ColumnsDescription PartLogElement::getColumnsDescription() {"MovePart", static_cast(MOVE_PART)}, {"MergePartsStart", static_cast(MERGE_PARTS_START)}, {"MutatePartStart", static_cast(MUTATE_PART_START)}, + {"ExportPart", static_cast(EXPORT_PART)}, } ); @@ -109,7 +110,8 @@ ColumnsDescription PartLogElement::getColumnsDescription() "RemovePart — Removing or detaching a data part using [DETACH PARTITION](/sql-reference/statements/alter/partition#detach-partitionpart)." "MutatePartStart — Mutating of a data part has started, " "MutatePart — Mutating of a data part has finished, " - "MovePart — Moving the data part from the one disk to another one."}, + "MovePart — Moving the data part from the one disk to another one." + "ExportPart — Exporting the data part from a MergeTree table into a target table that represents external storage (e.g., object storage or a data lake).."}, {"merge_reason", std::move(merge_reason_datatype), "The reason for the event with type MERGE_PARTS. Can have one of the following values: " "NotAMerge — The current event has the type other than MERGE_PARTS, " diff --git a/src/Interpreters/PartLog.h b/src/Interpreters/PartLog.h index 44d2fb413c5f..4f58069dae55 100644 --- a/src/Interpreters/PartLog.h +++ b/src/Interpreters/PartLog.h @@ -30,6 +30,7 @@ struct PartLogElement MOVE_PART = 6, MERGE_PARTS_START = 7, MUTATE_PART_START = 8, + EXPORT_PART = 9, }; /// Copy of MergeAlgorithm since values are written to disk. diff --git a/src/Interpreters/executeDDLQueryOnCluster.cpp b/src/Interpreters/executeDDLQueryOnCluster.cpp index f7dc9355a116..6bbcafae7b3d 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.cpp +++ b/src/Interpreters/executeDDLQueryOnCluster.cpp @@ -53,6 +53,8 @@ bool isSupportedAlterTypeForOnClusterDDLQuery(int type) ASTAlterCommand::ATTACH_PARTITION, /// Usually followed by ATTACH PARTITION ASTAlterCommand::FETCH_PARTITION, + /// Data operation that should be executed locally on each replica + ASTAlterCommand::EXPORT_PART, /// Logical error ASTAlterCommand::NO_TYPE, }; diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index cdf8b558fd61..78ba7f14d645 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -355,6 +355,29 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett ostr << quoteString(move_destination_name); } } + else if (type == ASTAlterCommand::EXPORT_PART) + { + ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << "PART" + << (settings.hilite ? hilite_none : ""); + partition->format(ostr, settings, state, frame); + ostr << " TO "; + switch (move_destination_type) + { + case DataDestinationType::TABLE: + ostr << "TABLE "; + if (!to_database.empty()) + { + ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database) + << (settings.hilite ? hilite_none : "") << "."; + } + ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table) + << (settings.hilite ? hilite_none : ""); + return; + default: + break; + } + + } else if (type == ASTAlterCommand::REPLACE_PARTITION) { ostr << (settings.hilite ? hilite_keyword : "") << (replace ? "REPLACE" : "ATTACH") << " PARTITION " @@ -624,6 +647,11 @@ bool ASTAlterQuery::isMovePartitionToDiskOrVolumeAlter() const return false; } +bool ASTAlterQuery::isExportPartAlter() const +{ + return isOneCommandTypeOnly(ASTAlterCommand::EXPORT_PART); +} + /** Get the text that identifies this element. */ String ASTAlterQuery::getID(char delim) const diff --git a/src/Parsers/ASTAlterQuery.h b/src/Parsers/ASTAlterQuery.h index 3867a86cf797..d8d502cb87c6 100644 --- a/src/Parsers/ASTAlterQuery.h +++ b/src/Parsers/ASTAlterQuery.h @@ -71,6 +71,7 @@ class ASTAlterCommand : public IAST FREEZE_ALL, UNFREEZE_PARTITION, UNFREEZE_ALL, + EXPORT_PART, DELETE, UPDATE, @@ -263,6 +264,8 @@ class ASTAlterQuery : public ASTQueryWithTableAndOutput, public ASTQueryWithOnCl bool isMovePartitionToDiskOrVolumeAlter() const; + bool isExportPartAlter() const; + bool isCommentAlter() const; String getID(char) const override; diff --git a/src/Parsers/CommonParsers.h b/src/Parsers/CommonParsers.h index e4db7beb9d4e..f0ba30ff1c6b 100644 --- a/src/Parsers/CommonParsers.h +++ b/src/Parsers/CommonParsers.h @@ -331,6 +331,7 @@ namespace DB MR_MACROS(MONTHS, "MONTHS") \ MR_MACROS(MOVE_PART, "MOVE PART") \ MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \ + MR_MACROS(EXPORT_PART, "EXPORT PART") \ MR_MACROS(MOVE, "MOVE") \ MR_MACROS(MS, "MS") \ MR_MACROS(MUTATION, "MUTATION") \ diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index 2c127e6ff1e1..289385a20eb2 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -82,6 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION); ParserKeyword s_move_partition(Keyword::MOVE_PARTITION); ParserKeyword s_move_part(Keyword::MOVE_PART); + ParserKeyword s_export_part(Keyword::EXPORT_PART); ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION); ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART); ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION); @@ -535,6 +536,23 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected command->move_destination_name = ast_space_name->as().value.safeGet(); } + else if (s_export_part.ignore(pos, expected)) + { + if (!parser_string_and_substituion.parse(pos, command_partition, expected)) + return false; + + command->type = ASTAlterCommand::EXPORT_PART; + command->part = true; + + if (!s_to_table.ignore(pos, expected)) + { + return false; + } + + if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table)) + return false; + command->move_destination_type = DataDestinationType::TABLE; + } else if (s_move_partition.ignore(pos, expected)) { if (!parser_partition.parse(pos, command_partition, expected)) diff --git a/src/Storages/IPartitionStrategy.cpp b/src/Storages/IPartitionStrategy.cpp index 6acf9c9a3a73..0e6ef5c3a5ae 100644 --- a/src/Storages/IPartitionStrategy.cpp +++ b/src/Storages/IPartitionStrategy.cpp @@ -256,19 +256,6 @@ ColumnPtr WildcardPartitionStrategy::computePartitionKey(const Chunk & chunk) return block_with_partition_by_expr.getByName(actions_with_column_name.column_name).column; } -std::string WildcardPartitionStrategy::getPathForRead( - const std::string & prefix) -{ - return prefix; -} - -std::string WildcardPartitionStrategy::getPathForWrite( - const std::string & prefix, - const std::string & partition_key) -{ - return PartitionedSink::replaceWildcards(prefix, partition_key); -} - HiveStylePartitionStrategy::HiveStylePartitionStrategy( KeyDescription partition_key_description_, const Block & sample_block_, @@ -288,41 +275,6 @@ HiveStylePartitionStrategy::HiveStylePartitionStrategy( block_without_partition_columns = buildBlockWithoutPartitionColumns(sample_block, partition_columns_name_set); } -std::string HiveStylePartitionStrategy::getPathForRead(const std::string & prefix) -{ - return prefix + "**." + Poco::toLower(file_format); -} - -std::string HiveStylePartitionStrategy::getPathForWrite( - const std::string & prefix, - const std::string & partition_key) -{ - std::string path; - - if (!prefix.empty()) - { - path += prefix; - if (path.back() != '/') - { - path += '/'; - } - } - - /// Not adding '/' because buildExpressionHive() always adds a trailing '/' - path += partition_key; - - /* - * File extension is toLower(format) - * This isn't ideal, but I guess multiple formats can be specified and introduced. - * So I think it is simpler to keep it this way. - * - * Or perhaps implement something like `IInputFormat::getFileExtension()` - */ - path += std::to_string(generateSnowflakeID()) + "." + Poco::toLower(file_format); - - return path; -} - ColumnPtr HiveStylePartitionStrategy::computePartitionKey(const Chunk & chunk) { Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); diff --git a/src/Storages/IPartitionStrategy.h b/src/Storages/IPartitionStrategy.h index bc90d7f03461..606122b4ae71 100644 --- a/src/Storages/IPartitionStrategy.h +++ b/src/Storages/IPartitionStrategy.h @@ -29,8 +29,12 @@ struct IPartitionStrategy virtual ColumnPtr computePartitionKey(const Chunk & chunk) = 0; - virtual std::string getPathForRead(const std::string & prefix) = 0; - virtual std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) = 0; + ColumnPtr computePartitionKey(Block & block) const + { + actions_with_column_name.actions->execute(block); + + return block.getByName(actions_with_column_name.column_name).column; + } virtual ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) { @@ -53,6 +57,7 @@ struct IPartitionStrategy const KeyDescription partition_key_description; const Block sample_block; ContextPtr context; + PartitionExpressionActionsAndColumnName actions_with_column_name; }; /* @@ -89,11 +94,6 @@ struct WildcardPartitionStrategy : IPartitionStrategy WildcardPartitionStrategy(KeyDescription partition_key_description_, const Block & sample_block_, ContextPtr context_); ColumnPtr computePartitionKey(const Chunk & chunk) override; - std::string getPathForRead(const std::string & prefix) override; - std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) override; - -private: - PartitionExpressionActionsAndColumnName actions_with_column_name; }; /* @@ -111,8 +111,6 @@ struct HiveStylePartitionStrategy : IPartitionStrategy bool partition_columns_in_data_file_); ColumnPtr computePartitionKey(const Chunk & chunk) override; - std::string getPathForRead(const std::string & prefix) override; - std::string getPathForWrite(const std::string & prefix, const std::string & partition_key) override; ColumnRawPtrs getFormatChunkColumns(const Chunk & chunk) override; Block getFormatHeader() override; @@ -121,7 +119,6 @@ struct HiveStylePartitionStrategy : IPartitionStrategy const std::string file_format; const bool partition_columns_in_data_file; std::unordered_set partition_columns_name_set; - PartitionExpressionActionsAndColumnName actions_with_column_name; Block block_without_partition_columns; }; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 028fdd61a26f..c86c06ca64a3 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -439,6 +439,31 @@ class IStorage : public std::enable_shared_from_this, public TypePromo ContextPtr /*context*/, bool /*async_insert*/); + virtual bool supportsImport() const + { + return false; + } + + struct ImportStats + { + ExecutionStatus status; + std::size_t elapsed_ns = 0; + std::size_t bytes_on_disk = 0; + std::size_t read_rows = 0; + std::size_t read_bytes = 0; + std::string file_path = ""; + }; + + virtual SinkToStoragePtr import( + const std::string & /* file_name */, + Block & /* block_with_partition_values */, + ContextPtr /* context */, + std::function /* stats_log */) + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName()); + } + + /** Writes the data to a table in distributed manner. * It is supposed that implementation looks into SELECT part of the query and executes distributed * INSERT SELECT if it is possible with current storage as a receiver and query SELECT part as a producer. diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 29f22dde5b5a..77604d5de286 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -23,6 +23,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -191,6 +195,8 @@ namespace Setting extern const SettingsUInt64 parts_to_throw_insert; extern const SettingsBool enable_shared_storage_snapshot_in_query; extern const SettingsUInt64 merge_tree_storage_snapshot_sleep_ms; + extern const SettingsBool allow_experimental_export_merge_tree_part; + extern const SettingsUInt64 min_bytes_to_use_direct_io; } namespace MergeTreeSetting @@ -302,6 +308,7 @@ namespace ErrorCodes extern const int LIMIT_EXCEEDED; extern const int CANNOT_FORGET_PARTITION; extern const int DATA_TYPE_CANNOT_BE_USED_IN_KEY; + extern const int UNKNOWN_TABLE; } static void checkSuspiciousIndices(const ASTFunction * index_function) @@ -4241,8 +4248,6 @@ void MergeTreeData::changeSettings( { if (new_settings) { - bool has_storage_policy_changed = false; - const auto & new_changes = new_settings->as().changes; StoragePolicyPtr new_storage_policy = nullptr; @@ -4281,8 +4286,6 @@ void MergeTreeData::changeSettings( disk->createDirectories(fs::path(relative_data_path) / DETACHED_DIR_NAME); } /// FIXME how would that be done while reloading configuration??? - - has_storage_policy_changed = true; } } } @@ -4299,9 +4302,6 @@ void MergeTreeData::changeSettings( StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); new_metadata.setSettingsChanges(new_settings); setInMemoryMetadata(new_metadata); - - if (has_storage_policy_changed) - startBackgroundMovesIfNeeded(); } } @@ -5889,6 +5889,170 @@ void MergeTreeData::movePartitionToTable(const PartitionCommand & command, Conte movePartitionToTable(dest_storage, command.partition, query_context); } +void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextPtr query_context) +{ + if (!query_context->getSettingsRef()[Setting::allow_experimental_export_merge_tree_part]) + { + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, + "Exporting merge tree part is experimental. Set `allow_experimental_export_merge_tree_part` to enable it"); + } + + String dest_database = query_context->resolveDatabase(command.to_database); + auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context); + + if (dest_storage->getStorageID() == this->getStorageID()) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Exporting to the same table is not allowed"); + } + + if (!dest_storage->supportsImport()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Destination storage {} does not support MergeTree parts or uses unsupported partitioning", dest_storage->getName()); + + auto query_to_string = [] (const ASTPtr & ast) + { + return ast ? ast->formatWithSecretsOneLine() : ""; + }; + + auto src_snapshot = getInMemoryMetadataPtr(); + auto destination_snapshot = dest_storage->getInMemoryMetadataPtr(); + + if (destination_snapshot->getColumns().getAllPhysical().sizeOfDifference(src_snapshot->getColumns().getAllPhysical())) + throw Exception(ErrorCodes::INCOMPATIBLE_COLUMNS, "Tables have different structure"); + + if (query_to_string(src_snapshot->getPartitionKeyAST()) != query_to_string(destination_snapshot->getPartitionKeyAST())) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Tables have different partition key"); + + auto part_name = command.partition->as().value.safeGet(); + + auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated}); + + if (!part) + throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No such data part '{}' to export in table '{}'", + part_name, getStorageID().getFullTableName()); + + { + std::lock_guard lock(export_manifests_mutex); + + if (!export_manifests.emplace(dest_storage->getStorageID(), part).second) + { + throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'", + part_name, dest_storage->getStorageID().getFullTableName()); + } + } + + background_moves_assignee.trigger(); +} + +void MergeTreeData::exportPartToTableImpl( + const MergeTreeExportManifest & manifest, + ContextPtr local_context) +{ + std::function part_log_wrapper = [this, manifest](ImportStats stats) { + const auto & data_part = manifest.data_part; + + writePartLog( + PartLogElement::Type::EXPORT_PART, + stats.status, + stats.elapsed_ns, + data_part->name, + data_part, + {data_part}, + nullptr, + nullptr); + + std::lock_guard inner_lock(export_manifests_mutex); + + export_manifests.erase(manifest); + }; + + auto metadata_snapshot = getInMemoryMetadataPtr(); + Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); + StorageSnapshotPtr storage_snapshot = getStorageSnapshot(metadata_snapshot, local_context); + + MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export; + + NamesAndTypesList partition_columns; + if (metadata_snapshot->hasPartitionKey()) + { + const auto & partition_key = metadata_snapshot->getPartitionKey(); + if (!partition_key.column_names.empty()) + partition_columns = partition_key.expression->getRequiredColumnsWithTypes(); + } + + auto block_with_partition_values = manifest.data_part->partition.getBlockWithPartitionValues(partition_columns); + + auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, getContext()); + if (!destination_storage) + { + std::lock_guard inner_lock(export_manifests_mutex); + + const auto destination_storage_id_name = manifest.destination_storage_id.getNameForLogs(); + export_manifests.erase(manifest); + throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name); + } + + auto sink = destination_storage->import( + manifest.data_part->name, + block_with_partition_values, + local_context, + part_log_wrapper); + + /// Most likely the file has already been imported, so we can just return + if (!sink) + { + std::lock_guard inner_lock(export_manifests_mutex); + + export_manifests.erase(manifest); + return; + } + + bool apply_deleted_mask = true; + bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk(); + bool prefetch = false; + + MergeTreeData::IMutationsSnapshot::Params params + { + .metadata_version = metadata_snapshot->getMetadataVersion(), + .min_part_metadata_version = manifest.data_part->getMetadataVersion(), + }; + + auto mutations_snapshot = getMutationsSnapshot(params); + + auto alter_conversions = MergeTreeData::getAlterConversionsForPart( + manifest.data_part, + mutations_snapshot, + local_context); + + QueryPlan plan_for_part; + + createReadFromPartStep( + read_type, + plan_for_part, + *this, + storage_snapshot, + RangesInDataPart(manifest.data_part), + alter_conversions, + nullptr, + columns_to_read, + nullptr, + apply_deleted_mask, + std::nullopt, + read_with_direct_io, + prefetch, + local_context, + getLogger("ExportPartition")); + + QueryPlanOptimizationSettings optimization_settings(local_context); + auto pipeline_settings = BuildQueryPipelineSettings(local_context); + auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + + pipeline.complete(sink); + + CompletedPipelineExecutor exec(pipeline); + exec.execute(); +} + void MergeTreeData::movePartitionToShard(const ASTPtr & /*partition*/, bool /*move_part*/, const String & /*to*/, ContextPtr /*query_context*/) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "MOVE PARTITION TO SHARD is not supported by storage {}", getName()); @@ -5940,6 +6104,11 @@ Pipe MergeTreeData::alterPartition( } } break; + case PartitionCommand::EXPORT_PART: + { + exportPartToTable(command, query_context); + break; + } case PartitionCommand::DROP_DETACHED_PARTITION: dropDetached(command.partition, command.part, query_context); @@ -8157,6 +8326,32 @@ std::pair MergeTreeData::cloneAn return std::make_pair(dst_data_part, std::move(temporary_directory_lock)); } +std::vector MergeTreeData::getExportsStatus() const +{ + std::lock_guard lock(export_manifests_mutex); + std::vector result; + + auto source_database = getStorageID().database_name; + auto source_table = getStorageID().table_name; + + for (const auto & manifest : export_manifests) + { + MergeTreeExportStatus status; + + status.source_database = source_database; + status.source_table = source_table; + status.destination_database = manifest.destination_storage_id.database_name; + status.destination_table = manifest.destination_storage_id.table_name; + status.create_time = manifest.create_time; + status.part_name = manifest.data_part->name; + + result.emplace_back(std::move(status)); + } + + return result; +} + + bool MergeTreeData::canUseAdaptiveGranularity() const { const auto settings = getSettings(); @@ -8551,21 +8746,43 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger() bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee) { - if (parts_mover.moves_blocker.isCancelled()) - return false; + if (!parts_mover.moves_blocker.isCancelled()) + { + auto moving_tagger = selectPartsForMove(); + if (!moving_tagger->parts_to_move.empty()) + { + assignee.scheduleMoveTask(std::make_shared( + [this, moving_tagger] () mutable + { + ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings(); + WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); + return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; + }, moves_assignee_trigger, getStorageID())); + return true; + } + } - auto moving_tagger = selectPartsForMove(); - if (moving_tagger->parts_to_move.empty()) - return false; + std::lock_guard lock(export_manifests_mutex); - assignee.scheduleMoveTask(std::make_shared( - [this, moving_tagger] () mutable + for (auto & manifest : export_manifests) + { + if (manifest.in_progress) { - ReadSettings read_settings = Context::getGlobalContextInstance()->getReadSettings(); - WriteSettings write_settings = Context::getGlobalContextInstance()->getWriteSettings(); - return moveParts(moving_tagger, read_settings, write_settings, /* wait_for_move_if_zero_copy= */ false) == MovePartsOutcome::PartsMoved; - }, moves_assignee_trigger, getStorageID())); - return true; + continue; + } + + manifest.in_progress = assignee.scheduleMoveTask(std::make_shared( + [this, manifest] () mutable { + exportPartToTableImpl(manifest, getContext()); + return true; + }, + moves_assignee_trigger, + getStorageID())); + + return manifest.in_progress; + } + + return false; } bool MergeTreeData::areBackgroundMovesNeeded() const @@ -8783,6 +9000,10 @@ bool MergeTreeData::canUsePolymorphicParts() const return canUsePolymorphicParts(*getSettings(), unused); } +void MergeTreeData::startBackgroundMoves() +{ + background_moves_assignee.start(); +} void MergeTreeData::checkDropCommandDoesntAffectInProgressMutations(const AlterCommand & command, const std::map & unfinished_mutations, ContextPtr local_context) const { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 89d3507266e0..7e148a7d0539 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -34,6 +34,8 @@ #include #include #include +#include +#include #include #include @@ -901,6 +903,12 @@ class MergeTreeData : public IStorage, public WithMutableContext /// Moves partition to specified Table void movePartitionToTable(const PartitionCommand & command, ContextPtr query_context); + void exportPartToTable(const PartitionCommand & command, ContextPtr query_context); + + void exportPartToTableImpl( + const MergeTreeExportManifest & manifest, + ContextPtr local_context); + /// Checks that Partition could be dropped right now /// Otherwise - throws an exception with detailed information. /// We do not use mutex because it is not very important that the size could change during the operation. @@ -964,6 +972,7 @@ class MergeTreeData : public IStorage, public WithMutableContext bool must_on_same_disk); virtual std::vector getMutationsStatus() const = 0; + std::vector getExportsStatus() const; /// Returns true if table can create new parts with adaptive granularity /// Has additional constraint in replicated version @@ -1144,6 +1153,10 @@ class MergeTreeData : public IStorage, public WithMutableContext /// Mutex for currently_moving_parts mutable std::mutex moving_parts_mutex; + mutable std::mutex export_manifests_mutex; + + std::set export_manifests; + PinnedPartUUIDsPtr getPinnedPartUUIDs() const; /// Schedules background job to like merge/mutate/fetch an executor @@ -1255,6 +1268,8 @@ class MergeTreeData : public IStorage, public WithMutableContext are_columns_and_secondary_indices_sizes_calculated = false; } + void startBackgroundMoves(); + /// Engine-specific methods BrokenPartCallback broken_part_callback; @@ -1699,8 +1714,6 @@ class MergeTreeData : public IStorage, public WithMutableContext bool canUsePolymorphicParts(const MergeTreeSettings & settings, String & out_reason) const; - virtual void startBackgroundMovesIfNeeded() = 0; - bool allow_nullable_key = false; bool allow_reverse_key = false; diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h new file mode 100644 index 000000000000..89aed701f993 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -0,0 +1,35 @@ +#include +#include + +namespace DB +{ + +struct MergeTreeExportManifest +{ + using DataPartPtr = std::shared_ptr; + + StorageID destination_storage_id; + DataPartPtr data_part; + time_t create_time = time(nullptr); + mutable bool in_progress = false; + + bool operator<(const MergeTreeExportManifest & rhs) const + { + // Lexicographic comparison: first compare destination storage, then part name + auto lhs_storage = destination_storage_id.getQualifiedName(); + auto rhs_storage = rhs.destination_storage_id.getQualifiedName(); + + if (lhs_storage != rhs_storage) + return lhs_storage < rhs_storage; + + return data_part->name < rhs.data_part->name; + } + + bool operator==(const MergeTreeExportManifest & rhs) const + { + return destination_storage_id.getQualifiedName() == rhs.destination_storage_id.getQualifiedName() + && data_part->name == rhs.data_part->name; + } +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeExportStatus.h b/src/Storages/MergeTree/MergeTreeExportStatus.h new file mode 100644 index 000000000000..e71a2f15e6ed --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeExportStatus.h @@ -0,0 +1,20 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +struct MergeTreeExportStatus +{ + String source_database; + String source_table; + String destination_database; + String destination_table; + time_t create_time = 0; + std::string part_name; +}; + +} diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index 445fc8846da3..19c33cbca5ed 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -479,6 +479,22 @@ void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Bl } } +Block MergeTreePartition::getBlockWithPartitionValues(const NamesAndTypesList & partition_columns) const +{ + chassert(partition_columns.size() == value.size()); + + Block result; + + std::size_t i = 0; + for (const auto & partition_column : partition_columns) + { + auto column = partition_column.type->createColumnConst(1, value[i++]); + result.insert({column, partition_column.type, partition_column.name}); + } + + return result; +} + NamesAndTypesList MergeTreePartition::executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context) { auto adjusted_partition_key = adjustPartitionKey(metadata_snapshot, context); diff --git a/src/Storages/MergeTree/MergeTreePartition.h b/src/Storages/MergeTree/MergeTreePartition.h index 4338b216cdb8..811cfdc2a90c 100644 --- a/src/Storages/MergeTree/MergeTreePartition.h +++ b/src/Storages/MergeTree/MergeTreePartition.h @@ -60,6 +60,8 @@ struct MergeTreePartition void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context); + Block getBlockWithPartitionValues(const NamesAndTypesList & partition_columns) const; + /// Adjust partition key and execute its expression on block. Return sample block according to used expression. static NamesAndTypesList executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context); diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index 9335e08fa4c2..d74a04744a11 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -140,6 +140,9 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( case Merge: read_settings.local_throttler = context->getMergesThrottler(); break; + case Export: + read_settings.local_throttler = context->getExportsThrottler(); + break; } read_settings.remote_throttler = read_settings.local_throttler; diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.h b/src/Storages/MergeTree/MergeTreeSequentialSource.h index abba230d9e79..a858adf33bb5 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.h +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.h @@ -15,6 +15,7 @@ enum MergeTreeSequentialSourceType { Mutation, Merge, + Export, }; /// Create stream for reading single part from MergeTree. diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp new file mode 100644 index 000000000000..82a1bbfc81ea --- /dev/null +++ b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp @@ -0,0 +1,65 @@ + +#include + +namespace DB +{ + +StorageObjectStorageImporterSink::StorageObjectStorageImporterSink( + const std::string & path_, + const ObjectStoragePtr & object_storage_, + const ConfigurationPtr & configuration_, + const std::optional & format_settings_, + const Block & sample_block_, + const std::function & part_log_, + const ContextPtr & context_) + : SinkToStorage(sample_block_) + , object_storage(object_storage_) + , configuration(configuration_) + , format_settings(format_settings_) + , sample_block(sample_block_) + , context(context_) + , part_log(part_log_) +{ + stats.file_path = path_; + sink = std::make_shared( + stats.file_path, + object_storage, + configuration, + format_settings, + sample_block, + context); +} + +String StorageObjectStorageImporterSink::getName() const +{ + return "StorageObjectStorageMergeTreePartImporterSink"; +} + +void StorageObjectStorageImporterSink::consume(Chunk & chunk) +{ + sink->consume(chunk); + stats.read_bytes += chunk.bytes(); + stats.read_rows += chunk.getNumRows(); +} + +void StorageObjectStorageImporterSink::onFinish() +{ + sink->onFinish(); + + if (const auto object_metadata = object_storage->tryGetObjectMetadata(stats.file_path)) + { + stats.bytes_on_disk = object_metadata->size_bytes; + } + + part_log(stats); +} + +void StorageObjectStorageImporterSink::onException(std::exception_ptr exception) +{ + sink->onException(exception); + + stats.status = ExecutionStatus(-1, "Error importing part"); + part_log(stats); +} + +} diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h new file mode 100644 index 000000000000..051f5196f964 --- /dev/null +++ b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +using DataPartPtr = std::shared_ptr; + +/* + * Wrapper around `StorageObjectsStorageSink` that takes care of accounting & metrics for partition export + */ +class StorageObjectStorageImporterSink : public SinkToStorage +{ +public: + using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; + + StorageObjectStorageImporterSink( + const std::string & path_, + const ObjectStoragePtr & object_storage_, + const ConfigurationPtr & configuration_, + const std::optional & format_settings_, + const Block & sample_block_, + const std::function & part_log_, + const ContextPtr & context_); + + String getName() const override; + + void consume(Chunk & chunk) override; + + void onFinish() override; + + void onException(std::exception_ptr exception) override; + +private: + std::shared_ptr sink; + ObjectStoragePtr object_storage; + ConfigurationPtr configuration; + std::optional format_settings; + Block sample_block; + ContextPtr context; + std::function part_log; + + IStorage::ImportStats stats; +}; + +} diff --git a/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h b/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h new file mode 100644 index 000000000000..1ed503cbf3c6 --- /dev/null +++ b/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h @@ -0,0 +1,79 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + struct ObjectStorageFilePathGenerator + { + virtual ~ObjectStorageFilePathGenerator() = default; + std::string getPathForWrite(const std::string & partition_id) const { + return getPathForWrite(partition_id, ""); + } + virtual std::string getPathForWrite(const std::string & partition_id, const std::string & /* file_name_override */) const = 0; + virtual std::string getPathForRead() const = 0; + }; + + struct ObjectStorageWildcardFilePathGenerator : ObjectStorageFilePathGenerator + { + explicit ObjectStorageWildcardFilePathGenerator(const std::string & raw_path_) : raw_path(raw_path_) {} + + using ObjectStorageFilePathGenerator::getPathForWrite; // Bring base class overloads into scope + std::string getPathForWrite(const std::string & partition_id, const std::string & /* file_name_override */) const override + { + return PartitionedSink::replaceWildcards(raw_path, partition_id); + } + + std::string getPathForRead() const override + { + return raw_path; + } + + private: + std::string raw_path; + + }; + + struct ObjectStorageAppendFilePathGenerator : ObjectStorageFilePathGenerator + { + explicit ObjectStorageAppendFilePathGenerator( + const std::string & raw_path_, + const std::string & file_format_) + : raw_path(raw_path_), file_format(Poco::toLower(file_format_)){} + + using ObjectStorageFilePathGenerator::getPathForWrite; // Bring base class overloads into scope + std::string getPathForWrite(const std::string & partition_id, const std::string & file_name_override) const override + { + std::string result; + + result += raw_path; + + if (raw_path.back() != '/') + { + result += "/"; + } + + /// Not adding '/' because buildExpressionHive() always adds a trailing '/' + result += partition_id; + + const auto file_name = file_name_override.empty() ? std::to_string(generateSnowflakeID()) : file_name_override; + + result += file_name + "." + file_format; + + return result; + } + + std::string getPathForRead() const override + { + return raw_path + "**." + file_format; + } + + private: + std::string raw_path; + std::string file_format; + }; + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index bd46d8412649..1b0d322840fd 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -26,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -424,7 +426,8 @@ SinkToStoragePtr StorageObjectStorage::write( if (configuration->getPartitionStrategy()) { - return std::make_shared(object_storage, configuration, format_settings, sample_block, local_context); + auto sink_creator = std::make_shared(object_storage, configuration, format_settings, sample_block, local_context); + return std::make_shared(configuration->partition_strategy, sink_creator, local_context, sample_block); } auto paths = configuration->getPaths(); @@ -443,6 +446,48 @@ SinkToStoragePtr StorageObjectStorage::write( local_context); } +bool StorageObjectStorage::supportsImport() const +{ + return configuration->partition_strategy != nullptr && configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE; +} + +SinkToStoragePtr StorageObjectStorage::import( + const std::string & file_name, + Block & block_with_partition_values, + ContextPtr local_context, + std::function part_log) +{ + std::string partition_key; + + if (configuration->partition_strategy) + { + const auto column_with_partition_key = configuration->partition_strategy->computePartitionKey(block_with_partition_values); + + if (!column_with_partition_key->empty()) + { + partition_key = column_with_partition_key->getDataAt(0).toString(); + } + } + + const auto file_path = configuration->getPathForWrite(partition_key, file_name).path; + + if (object_storage->exists(StoredObject(file_path))) + { + LOG_INFO(getLogger("StorageObjectStorage"), "File {} already exists, skipping import", file_path); + return nullptr; + } + + return std::make_shared( + file_path, + object_storage, + configuration, + format_settings, + getInMemoryMetadataPtr()->getSampleBlock(), + part_log, + local_context + ); +} + void StorageObjectStorage::truncate( const ASTPtr & /* query */, const StorageMetadataPtr & /* metadata_snapshot */, @@ -603,6 +648,17 @@ void StorageObjectStorage::Configuration::initialize( setPartitionStrategyType(PartitionStrategyFactory::StrategyType::WILDCARD); } + if (partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE) + { + file_path_generator = std::make_shared( + getRawPath().path, + format); + } + else + { + file_path_generator = std::make_shared(getRawPath().path); + } + if (format == "auto") { if (isDataLakeConfiguration()) @@ -620,8 +676,7 @@ void StorageObjectStorage::Configuration::initialize( else FormatFactory::instance().checkFormatName(format); - /// It might be changed on `StorageObjectStorage::Configuration::initPartitionStrategy` - read_path = getRawPath(); + read_path = file_path_generator->getPathForRead(); initialized = true; } @@ -639,7 +694,6 @@ void StorageObjectStorage::Configuration::initPartitionStrategy(ASTPtr partition if (partition_strategy) { - read_path = partition_strategy->getPathForRead(getRawPath().path); LOG_DEBUG(getLogger("StorageObjectStorageConfiguration"), "Initialized partition strategy {}", magic_enum::enum_name(partition_strategy_type)); } } @@ -651,16 +705,13 @@ const StorageObjectStorage::Configuration::Path & StorageObjectStorage::Configur StorageObjectStorage::Configuration::Path StorageObjectStorage::Configuration::getPathForWrite(const std::string & partition_id) const { - auto raw_path = getRawPath(); - - if (!partition_strategy) - { - return raw_path; - } - - return Path {partition_strategy->getPathForWrite(raw_path.path, partition_id)}; + return getPathForWrite(partition_id, /* filename_override */ ""); } +StorageObjectStorage::Configuration::Path StorageObjectStorage::Configuration::getPathForWrite(const std::string & partition_id, const std::string & filename_override) const +{ + return Path {file_path_generator->getPathForWrite(partition_id, filename_override)}; +} bool StorageObjectStorage::Configuration::Path::hasPartitionWildcard() const { diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 721cab5b8c6a..14c83e722948 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -7,6 +7,7 @@ #include #include #include +#include "Storages/ObjectStorage/ObjectStorageFilePathGenerator.h" #include #include #include @@ -99,6 +100,15 @@ class StorageObjectStorage : public IStorage ContextPtr context, bool async_insert) override; + + bool supportsImport() const override; + + SinkToStoragePtr import( + const std::string & /* file_name */, + Block & /* block_with_partition_values */, + ContextPtr /* context */, + std::function /* part_log */) override; + void truncate( const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, @@ -228,6 +238,8 @@ class StorageObjectStorage::Configuration // Path used for writing, it should not be globbed and might contain a partition key virtual Path getPathForWrite(const std::string & partition_id) const; + Path getPathForWrite(const std::string & partition_id, const std::string & filename_override) const; + virtual void setPathForRead(const Path & path) { read_path = path; @@ -356,7 +368,6 @@ class StorageObjectStorage::Configuration virtual void assertInitialized() const; -private: String format = "auto"; String compression_method = "auto"; String structure = "auto"; @@ -373,6 +384,7 @@ class StorageObjectStorage::Configuration // Path used for reading, by default it is the same as `getRawPath` // When using `partition_strategy=hive`, a recursive reading pattern will be appended `'table_root/**.parquet' Path read_path; + std::shared_ptr file_path_generator; }; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index ed34199adb07..d5f0ce7e1330 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -150,6 +150,11 @@ StorageObjectStorageCluster::StorageObjectStorageCluster( metadata.setColumns(columns); metadata.setConstraints(constraints_); + if (configuration->getPartitionStrategy()) + { + metadata.partition_key = configuration->getPartitionStrategy()->getPartitionKeyDescription(); + } + setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(metadata.columns)); setInMemoryMetadata(metadata); @@ -597,4 +602,24 @@ IDataLakeMetadata * StorageObjectStorageCluster::getExternalMetadata(ContextPtr return configuration->getExternalMetadata(); } +bool StorageObjectStorageCluster::supportsImport() const +{ + if (pure_storage) + return pure_storage->supportsImport(); + return false; +} + +SinkToStoragePtr StorageObjectStorageCluster::import( + const std::string & file_name, + Block & block_with_partition_values, + ContextPtr context, + std::function part_log) +{ + if (pure_storage) + return pure_storage->import(file_name, block_with_partition_values, context, part_log); + + return IStorageCluster::import(file_name, block_with_partition_values, context, part_log); +} + + } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 2cceaa7fa6b2..41d7e7e7d867 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -58,6 +58,14 @@ class StorageObjectStorageCluster : public IStorageCluster StorageMetadataPtr getInMemoryMetadataPtr() const override; + bool supportsImport() const override; + + SinkToStoragePtr import( + const std::string & /* file_name */, + Block & /* block_with_partition_values */, + ContextPtr /* context */, + std::function /* part_log */) override; + private: void updateQueryToSendIfNeeded( ASTPtr & query, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp index 283721783f7a..9f341f34e6f6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp @@ -133,8 +133,7 @@ PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink( std::optional format_settings_, const Block & sample_block_, ContextPtr context_) - : PartitionedSink(configuration_->getPartitionStrategy(), context_, sample_block_) - , object_storage(object_storage_) + : object_storage(object_storage_) , configuration(configuration_) , query_settings(configuration_->getQuerySettings(context_)) , format_settings(format_settings_) @@ -167,7 +166,7 @@ SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String object_storage, configuration, format_settings, - partition_strategy->getFormatHeader(), + configuration->partition_strategy->getFormatHeader(), context ); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.h b/src/Storages/ObjectStorage/StorageObjectStorageSink.h index ebfee5ab96e6..22625e0428e1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.h @@ -8,6 +8,8 @@ namespace DB { class StorageObjectStorageSink : public SinkToStorage { +friend class StorageObjectStorageImporterSink; + public: using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; @@ -38,7 +40,7 @@ class StorageObjectStorageSink : public SinkToStorage void cancelBuffers(); }; -class PartitionedStorageObjectStorageSink : public PartitionedSink +class PartitionedStorageObjectStorageSink : public PartitionedSink::SinkCreator { public: using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; diff --git a/src/Storages/PartitionCommands.cpp b/src/Storages/PartitionCommands.cpp index 41cc03552458..05fb3a639bb8 100644 --- a/src/Storages/PartitionCommands.cpp +++ b/src/Storages/PartitionCommands.cpp @@ -130,6 +130,16 @@ std::optional PartitionCommand::parse(const ASTAlterCommand * res.with_name = command_ast->with_name; return res; } + if (command_ast->type == ASTAlterCommand::EXPORT_PART) + { + PartitionCommand res; + res.type = EXPORT_PART; + res.partition = command_ast->partition->clone(); + res.part = command_ast->part; + res.to_database = command_ast->to_database; + res.to_table = command_ast->to_table; + return res; + } return {}; } @@ -171,6 +181,8 @@ std::string PartitionCommand::typeToString() const return "UNFREEZE ALL"; case PartitionCommand::Type::REPLACE_PARTITION: return "REPLACE PARTITION"; + case PartitionCommand::Type::EXPORT_PART: + return "EXPORT PART"; default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Uninitialized partition command"); } diff --git a/src/Storages/PartitionCommands.h b/src/Storages/PartitionCommands.h index 917e510f24b4..15d2a7fb869f 100644 --- a/src/Storages/PartitionCommands.h +++ b/src/Storages/PartitionCommands.h @@ -33,6 +33,7 @@ struct PartitionCommand UNFREEZE_ALL_PARTITIONS, UNFREEZE_PARTITION, REPLACE_PARTITION, + EXPORT_PART, }; Type type = UNKNOWN; diff --git a/src/Storages/PartitionedSink.cpp b/src/Storages/PartitionedSink.cpp index ec43e4b4ca1f..3ed9bb74f264 100644 --- a/src/Storages/PartitionedSink.cpp +++ b/src/Storages/PartitionedSink.cpp @@ -22,10 +22,12 @@ namespace ErrorCodes PartitionedSink::PartitionedSink( std::shared_ptr partition_strategy_, + std::shared_ptr sink_creator_, ContextPtr context_, const Block & sample_block_) : SinkToStorage(sample_block_) , partition_strategy(partition_strategy_) + , sink_creator(sink_creator_) , context(context_) , sample_block(sample_block_) { @@ -37,7 +39,7 @@ SinkPtr PartitionedSink::getSinkForPartitionKey(StringRef partition_key) auto it = partition_id_to_sink.find(partition_key); if (it == partition_id_to_sink.end()) { - auto sink = createSinkForPartition(partition_key.toString()); + auto sink = sink_creator->createSinkForPartition(partition_key.toString()); std::tie(it, std::ignore) = partition_id_to_sink.emplace(partition_key, sink); } diff --git a/src/Storages/PartitionedSink.h b/src/Storages/PartitionedSink.h index 481230792db0..71df252aa174 100644 --- a/src/Storages/PartitionedSink.h +++ b/src/Storages/PartitionedSink.h @@ -17,10 +17,17 @@ namespace DB class PartitionedSink : public SinkToStorage { public: + struct SinkCreator + { + virtual ~SinkCreator() = default; + virtual SinkPtr createSinkForPartition(const String & partition_id) = 0; + }; + static constexpr auto PARTITION_ID_WILDCARD = "{_partition_id}"; PartitionedSink( std::shared_ptr partition_strategy_, + std::shared_ptr sink_creator_, ContextPtr context_, const Block & sample_block_); @@ -34,16 +41,15 @@ class PartitionedSink : public SinkToStorage void onFinish() override; - virtual SinkPtr createSinkForPartition(const String & partition_id) = 0; - static void validatePartitionKey(const String & str, bool allow_slash); static String replaceWildcards(const String & haystack, const String & partition_id); + protected: std::shared_ptr partition_strategy; - private: + std::shared_ptr sink_creator; ContextPtr context; Block sample_block; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 48c11e8d8e9e..5156af8c4edc 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -1936,7 +1936,7 @@ class StorageFileSink final : public SinkToStorage, WithContext std::unique_lock lock; }; -class PartitionedStorageFileSink : public PartitionedSink +class PartitionedStorageFileSink : public PartitionedSink::SinkCreator { public: PartitionedStorageFileSink( @@ -1951,7 +1951,7 @@ class PartitionedStorageFileSink : public PartitionedSink const String format_name_, ContextPtr context_, int flags_) - : PartitionedSink(partition_strategy_, context_, metadata_snapshot_->getSampleBlock()) + : partition_strategy(partition_strategy_) , path(path_) , metadata_snapshot(metadata_snapshot_) , table_name_for_log(table_name_for_log_) @@ -1967,11 +1967,12 @@ class PartitionedStorageFileSink : public PartitionedSink SinkPtr createSinkForPartition(const String & partition_id) override { - std::string filepath = partition_strategy->getPathForWrite(path, partition_id); + const auto file_path_generator = std::make_shared(path); + std::string filepath = file_path_generator->getPathForWrite(partition_id); fs::create_directories(fs::path(filepath).parent_path()); - validatePartitionKey(filepath, true); + PartitionedSink::validatePartitionKey(filepath, true); checkCreationIsAllowed(context, context->getUserFilesPath(), filepath, /*can_be_directory=*/ true); return std::make_shared( metadata_snapshot, @@ -1988,6 +1989,7 @@ class PartitionedStorageFileSink : public PartitionedSink } private: + std::shared_ptr partition_strategy; const String path; StorageMetadataPtr metadata_snapshot; String table_name_for_log; @@ -2039,7 +2041,7 @@ SinkToStoragePtr StorageFile::write( has_wildcards, /* partition_columns_in_data_file */true); - return std::make_shared( + auto sink_creator = std::make_shared( partition_strategy, metadata_snapshot, getStorageID().getNameForLogs(), @@ -2051,6 +2053,13 @@ SinkToStoragePtr StorageFile::write( format_name, context, flags); + + return std::make_shared( + partition_strategy, + sink_creator, + context, + metadata_snapshot->getSampleBlock() + ); } String path; @@ -2076,6 +2085,7 @@ SinkToStoragePtr StorageFile::write( String new_path; do { + new_path = path.substr(0, pos) + "." + std::to_string(index) + (pos == std::string::npos ? "" : path.substr(pos)); ++index; } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 716733224dbf..cce98e8dc0a1 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -47,6 +47,7 @@ #include #include "Core/BackgroundSchedulePool.h" #include "Core/Names.h" +#include "Parsers/ASTLiteral.h" namespace DB { @@ -109,6 +110,7 @@ namespace ErrorCodes extern const int TABLE_IS_READ_ONLY; extern const int TOO_MANY_PARTS; extern const int PART_IS_LOCKED; + extern const int INCOMPATIBLE_COLUMNS; } namespace ActionLocks @@ -199,7 +201,7 @@ void StorageMergeTree::startup() try { background_operations_assignee.start(); - startBackgroundMovesIfNeeded(); + startBackgroundMoves(); startOutdatedAndUnexpectedDataPartsLoadingTask(); } catch (...) @@ -2689,12 +2691,6 @@ MutationCounters StorageMergeTree::getMutationCounters() const return mutation_counters; } -void StorageMergeTree::startBackgroundMovesIfNeeded() -{ - if (areBackgroundMovesNeeded()) - background_moves_assignee.start(); -} - std::unique_ptr StorageMergeTree::getDefaultSettings() const { return std::make_unique(getContext()->getMergeTreeSettings()); diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 36ac29f2918b..31daf8d5a853 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -271,8 +271,6 @@ class StorageMergeTree final : public MergeTreeData void fillNewPartName(MutableDataPartPtr & part, DataPartsLock & lock); void fillNewPartNameAndResetLevel(MutableDataPartPtr & part, DataPartsLock & lock); - void startBackgroundMovesIfNeeded() override; - BackupEntries backupMutations(UInt64 version, const String & data_path_in_backup) const; /// Attaches restored parts to the storage. diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 565d0d3fb5f3..956e8e8913b7 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -5598,7 +5598,7 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread, const ZooK restarting_thread.start(true); }); - startBackgroundMovesIfNeeded(); + startBackgroundMoves(); part_moves_between_shards_orchestrator.start(); @@ -9729,13 +9729,6 @@ MutationCounters StorageReplicatedMergeTree::getMutationCounters() const return queue.getMutationCounters(); } -void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded() -{ - if (areBackgroundMovesNeeded()) - background_moves_assignee.start(); -} - - std::unique_ptr StorageReplicatedMergeTree::getDefaultSettings() const { return std::make_unique(getContext()->getReplicatedMergeTreeSettings()); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index f4f03b2e3fcd..a63a7ef86d4d 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -953,8 +953,6 @@ class StorageReplicatedMergeTree final : public MergeTreeData MutationsSnapshotPtr getMutationsSnapshot(const IMutationsSnapshot::Params & params) const override; - void startBackgroundMovesIfNeeded() override; - /// Attaches restored parts to the storage. void attachRestoredParts(MutableDataPartsVector && parts) override; diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 4bd77a879d68..c5dfd426d207 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -724,7 +724,7 @@ void StorageURLSink::cancelBuffers() write_buf->cancel(); } -class PartitionedStorageURLSink : public PartitionedSink +class PartitionedStorageURLSink : public PartitionedSink::SinkCreator { public: PartitionedStorageURLSink( @@ -738,7 +738,7 @@ class PartitionedStorageURLSink : public PartitionedSink const CompressionMethod compression_method_, const HTTPHeaderEntries & headers_, const String & http_method_) - : PartitionedSink(partition_strategy_, context_, sample_block_) + : partition_strategy(partition_strategy_) , uri(uri_) , format(format_) , format_settings(format_settings_) @@ -753,7 +753,8 @@ class PartitionedStorageURLSink : public PartitionedSink SinkPtr createSinkForPartition(const String & partition_id) override { - std::string partition_path = partition_strategy->getPathForWrite(uri, partition_id); + const auto file_path_generator = std::make_shared(uri); + std::string partition_path = file_path_generator->getPathForWrite(partition_id); context->getRemoteHostFilter().checkURL(Poco::URI(partition_path)); return std::make_shared( @@ -761,6 +762,7 @@ class PartitionedStorageURLSink : public PartitionedSink } private: + std::shared_ptr partition_strategy; const String uri; const String format; const std::optional format_settings; @@ -1403,7 +1405,7 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad has_wildcards, /* partition_columns_in_data_file */true); - return std::make_shared( + auto sink_creator = std::make_shared( partition_strategy, uri, format_name, @@ -1414,6 +1416,8 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad compression_method, headers, http_method); + + return std::make_shared(partition_strategy, sink_creator, context, metadata_snapshot->getSampleBlock()); } return std::make_shared( diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp new file mode 100644 index 000000000000..8a25b4188426 --- /dev/null +++ b/src/Storages/System/StorageSystemExports.cpp @@ -0,0 +1,118 @@ +#include +#include +#include +#include +#include "Columns/ColumnString.h" +#include "DataTypes/DataTypeString.h" +#include +#include "Storages/VirtualColumnUtils.h" +#include +#include +#include + + +namespace DB +{ + +ColumnsDescription StorageSystemExports::getColumnsDescription() +{ + return ColumnsDescription + { + {"source_database", std::make_shared(), "Name of the source database."}, + {"source_table", std::make_shared(), "Name of the source table."}, + {"destination_database", std::make_shared(), "Name of the destination database."}, + {"destination_table", std::make_shared(), "Name of the destination table."}, + {"create_time", std::make_shared(), "Date and time when the export command was submitted for execution."}, + {"part_name", std::make_shared(), "Name of the part"} + }; +} + +void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector) const +{ + const auto access = context->getAccess(); + const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES); + + /// Collect a set of *MergeTree tables. + std::map> merge_tree_tables; + for (const auto & db : DatabaseCatalog::instance().getDatabases()) + { + /// Check if database can contain MergeTree tables + if (!db.second->canContainMergeTreeTables()) + continue; + + const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); + + for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) + { + const auto & table = iterator->table(); + if (!table) + continue; + + if (!dynamic_cast(table.get())) + continue; + + if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) + continue; + + merge_tree_tables[db.first][iterator->name()] = table; + } + } + + MutableColumnPtr col_source_database_export = ColumnString::create(); + MutableColumnPtr col_source_table_export = ColumnString::create(); + + for (auto & db : merge_tree_tables) + { + for (auto & table : db.second) + { + col_source_database_export->insert(db.first); + col_source_table_export->insert(table.first); + } + } + + ColumnPtr col_source_database = std::move(col_source_database_export); + ColumnPtr col_source_table = std::move(col_source_table_export); + + /// Determine what tables are needed by the conditions in the query. + { + Block filtered_block + { + { col_source_database, std::make_shared(), "source_database" }, + { col_source_table, std::make_shared(), "source_table" }, + }; + + VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context); + + if (!filtered_block.rows()) + return; + + col_source_database = filtered_block.getByName("source_database").column; + col_source_table = filtered_block.getByName("source_table").column; + } + + for (size_t i_storage = 0; i_storage < col_source_database->size(); ++i_storage) + { + auto database = (*col_source_database)[i_storage].safeGet(); + auto table = (*col_source_table)[i_storage].safeGet(); + + std::vector statuses; + { + const IStorage * storage = merge_tree_tables[database][table].get(); + if (const auto * merge_tree = dynamic_cast(storage)) + statuses = merge_tree->getExportsStatus(); + } + + for (const MergeTreeExportStatus & status : statuses) + { + size_t col_num = 0; + res_columns[col_num++]->insert(status.source_database); + res_columns[col_num++]->insert(status.source_table); + res_columns[col_num++]->insert(status.destination_database); + res_columns[col_num++]->insert(status.destination_table); + res_columns[col_num++]->insert(status.create_time); + res_columns[col_num++]->insert(status.part_name); + } + } +} + +} diff --git a/src/Storages/System/StorageSystemExports.h b/src/Storages/System/StorageSystemExports.h new file mode 100644 index 000000000000..e13fbfa26aaa --- /dev/null +++ b/src/Storages/System/StorageSystemExports.h @@ -0,0 +1,25 @@ +#pragma once + +#include + + +namespace DB +{ + +class Context; + + +class StorageSystemExports final : public IStorageSystemOneBlock +{ +public: + std::string getName() const override { return "SystemExports"; } + + static ColumnsDescription getColumnsDescription(); + +protected: + using IStorageSystemOneBlock::IStorageSystemOneBlock; + + void fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector) const override; +}; + +} diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index 9249fb3530b6..ad07a4231260 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -102,6 +102,7 @@ #include #include #include +#include #include #include diff --git a/tests/queries/0_stateless/01271_show_privileges.reference b/tests/queries/0_stateless/01271_show_privileges.reference index 2d4bc4ecc150..3cd42125ace0 100644 --- a/tests/queries/0_stateless/01271_show_privileges.reference +++ b/tests/queries/0_stateless/01271_show_privileges.reference @@ -43,6 +43,7 @@ ALTER TTL ['ALTER MODIFY TTL','MODIFY TTL'] TABLE ALTER TABLE ALTER MATERIALIZE TTL ['MATERIALIZE TTL'] TABLE ALTER TABLE ALTER SETTINGS ['ALTER SETTING','ALTER MODIFY SETTING','MODIFY SETTING','RESET SETTING'] TABLE ALTER TABLE ALTER MOVE PARTITION ['ALTER MOVE PART','MOVE PARTITION','MOVE PART'] TABLE ALTER TABLE +ALTER EXPORT PART ['ALTER EXPORT PART','EXPORT PART'] TABLE ALTER TABLE ALTER FETCH PARTITION ['ALTER FETCH PART','FETCH PARTITION'] TABLE ALTER TABLE ALTER FREEZE PARTITION ['FREEZE PARTITION','UNFREEZE'] TABLE ALTER TABLE ALTER UNLOCK SNAPSHOT ['UNLOCK SNAPSHOT'] TABLE ALTER TABLE diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference new file mode 100644 index 000000000000..75c32111f7a3 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference @@ -0,0 +1,16 @@ +---- Export 2020_1_1_0 and 2021_2_2_0 +---- Both data parts should appear +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 1 +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 2 +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 3 +test/s3_table_NAME/year=2021/2021_2_2_0.parquet 4 +---- Export the same part again, it should be idempotent +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 1 +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 2 +test/s3_table_NAME/year=2020/2020_1_1_0.parquet 3 +test/s3_table_NAME/year=2021/2021_2_2_0.parquet 4 +---- Data in roundtrip MergeTree table (should match s3_table) +1 2020 +2 2020 +3 2020 +4 2021 diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh new file mode 100755 index 000000000000..5e886eaf4755 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +mt_table="mt_table_${RANDOM}" +s3_table="s3_table_${RANDOM}" +mt_table_roundtrip="mt_table_roundtrip_${RANDOM}" + +query() { + $CLICKHOUSE_CLIENT --query "$1" +} + +query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip" + +query "CREATE TABLE $mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()" +query "CREATE TABLE $s3_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table', format=Parquet, partition_strategy='hive') PARTITION BY year" + +query "INSERT INTO $mt_table VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)" +echo "---- Export 2020_1_1_0 and 2021_2_2_0" +query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" +query "ALTER TABLE $mt_table EXPORT PART '2021_2_2_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +echo "---- Both data parts should appear" +query "SELECT DISTINCT ON (id) replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), id FROM $s3_table ORDER BY id" + +echo "---- Export the same part again, it should be idempotent" +query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +query "SELECT DISTINCT ON (id) replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), id FROM $s3_table ORDER BY id" + +query "CREATE TABLE $mt_table_roundtrip ENGINE = MergeTree() PARTITION BY year ORDER BY tuple() AS SELECT * FROM $s3_table" + +echo "---- Data in roundtrip MergeTree table (should match s3_table)" +query "SELECT DISTINCT ON (id) * FROM $mt_table_roundtrip ORDER BY id" + +query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip" diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql new file mode 100644 index 000000000000..136b12142383 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql @@ -0,0 +1,22 @@ +-- Tags: no-parallel + +DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table; + +CREATE TABLE 03572_mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); + +INSERT INTO 03572_mt_table VALUES (1, 2020); + +-- Create a table with a different partition key and export a partition to it. It should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, x UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table', format='Parquet', partition_strategy='hive') PARTITION BY x; + +ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_invalid_schema_table +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError INCOMPATIBLE_COLUMNS} + +DROP TABLE 03572_invalid_schema_table; + +-- The only partition strategy that supports exports is hive. Wildcard should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table/{_partition_id}', format='Parquet', partition_strategy='wildcard') PARTITION BY (id, year); + +ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_invalid_schema_table SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError NOT_IMPLEMENTED} + +DROP TABLE IF EXISTS 03572_mt_table, 03572_invalid_schema_table; diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.reference new file mode 100644 index 000000000000..07f1ec6376a6 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.reference @@ -0,0 +1,16 @@ +---- Get actual part names and export them +---- Both data parts should appear +1 2020 +2 2020 +3 2020 +4 2021 +---- Export the same part again, it should be idempotent +1 2020 +2 2020 +3 2020 +4 2021 +---- Data in roundtrip ReplicatedMergeTree table (should match s3_table) +1 2020 +2 2020 +3 2020 +4 2021 diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh new file mode 100755 index 000000000000..3b955d4bbbe5 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash +# Tags: replica, no-parallel, no-replicated-database + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +rmt_table="rmt_table_${RANDOM}" +s3_table="s3_table_${RANDOM}" +rmt_table_roundtrip="rmt_table_roundtrip_${RANDOM}" + +query() { + $CLICKHOUSE_CLIENT --query "$1" +} + +query "DROP TABLE IF EXISTS $rmt_table, $s3_table, $rmt_table_roundtrip" + +query "CREATE TABLE $rmt_table (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/$rmt_table', 'replica1') PARTITION BY year ORDER BY tuple()" +query "CREATE TABLE $s3_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table', format=Parquet, partition_strategy='hive') PARTITION BY year" + +query "INSERT INTO $rmt_table VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)" + +echo "---- Get actual part names and export them" +part_2020=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$rmt_table' AND partition = '2020' ORDER BY name LIMIT 1" | tr -d '\n') +part_2021=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$rmt_table' AND partition = '2021' ORDER BY name LIMIT 1" | tr -d '\n') + +query "ALTER TABLE $rmt_table EXPORT PART '$part_2020' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" +query "ALTER TABLE $rmt_table EXPORT PART '$part_2021' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +echo "---- Both data parts should appear" +query "SELECT * FROM $s3_table ORDER BY id" + +echo "---- Export the same part again, it should be idempotent" +query "ALTER TABLE $rmt_table EXPORT PART '$part_2020' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" + +query "SELECT * FROM $s3_table ORDER BY id" + +query "CREATE TABLE $rmt_table_roundtrip ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/$rmt_table_roundtrip', 'replica1') PARTITION BY year ORDER BY tuple() AS SELECT * FROM $s3_table" + +echo "---- Data in roundtrip ReplicatedMergeTree table (should match s3_table)" +query "SELECT * FROM $rmt_table_roundtrip ORDER BY id" + +query "DROP TABLE IF EXISTS $rmt_table, $s3_table, $rmt_table_roundtrip" diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.reference b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql new file mode 100644 index 000000000000..e0aa82532190 --- /dev/null +++ b/tests/queries/0_stateless/03572_export_replicated_merge_tree_part_to_object_storage_simple.sql @@ -0,0 +1,22 @@ +-- Tags: no-parallel + +DROP TABLE IF EXISTS 03572_rmt_table, 03572_invalid_schema_table; + +CREATE TABLE 03572_rmt_table (id UInt64, year UInt16) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_03572_rmt/03572_rmt_table', 'replica1') PARTITION BY year ORDER BY tuple(); + +INSERT INTO 03572_rmt_table VALUES (1, 2020); + +-- Create a table with a different partition key and export a partition to it. It should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, x UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table', format='Parquet', partition_strategy='hive') PARTITION BY x; + +ALTER TABLE 03572_rmt_table EXPORT PART '2020_0_0_0' TO TABLE 03572_invalid_schema_table +SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError INCOMPATIBLE_COLUMNS} + +DROP TABLE 03572_invalid_schema_table; + +-- The only partition strategy that supports exports is hive. Wildcard should throw +CREATE TABLE 03572_invalid_schema_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='03572_invalid_schema_table/{_partition_id}', format='Parquet', partition_strategy='wildcard') PARTITION BY (id, year); + +ALTER TABLE 03572_rmt_table EXPORT PART '2020_0_0_0' TO TABLE 03572_invalid_schema_table SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError NOT_IMPLEMENTED} + +DROP TABLE IF EXISTS 03572_rmt_table, 03572_invalid_schema_table;