From 3cbe981f0b555d5add777f3c59dddba7c0c0de23 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 16 Sep 2025 13:30:42 -0300 Subject: [PATCH 01/12] improve observability a bit, simplify sink --- src/Common/CurrentMetrics.cpp | 1 + src/Interpreters/Context.cpp | 4 + src/Interpreters/Context.h | 4 + src/Storages/IStorage.h | 13 +- src/Storages/MergeTree/ExportList.cpp | 44 +++++++ src/Storages/MergeTree/ExportList.h | 75 +++++++++++ src/Storages/MergeTree/MergeTreeData.cpp | 77 ++++++++---- .../StorageObjectStorageImporterSink.cpp | 65 ---------- .../StorageObjectStorageImporterSink.h | 54 -------- .../ObjectStorage/StorageObjectStorage.cpp | 12 +- .../ObjectStorage/StorageObjectStorage.h | 3 +- .../StorageObjectStorageCluster.cpp | 7 +- .../StorageObjectStorageCluster.h | 3 +- src/Storages/System/StorageSystemExports.cpp | 116 +++++------------- src/Storages/System/StorageSystemMerges.cpp | 2 +- src/Storages/System/attachSystemTables.cpp | 2 + 16 files changed, 226 insertions(+), 256 deletions(-) create mode 100644 src/Storages/MergeTree/ExportList.cpp create mode 100644 src/Storages/MergeTree/ExportList.h delete mode 100644 src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp delete mode 100644 src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 6cf960218232..bfb492a812aa 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -10,6 +10,7 @@ M(Merge, "Number of executing background merges") \ M(MergeParts, "Number of source parts participating in current background merges") \ M(Move, "Number of currently executing moves") \ + M(Export, "Number of currently executing exports") \ M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \ M(ReplicatedFetch, "Number of data parts being fetched from replica") \ M(ReplicatedSend, "Number of data parts being sent to replicas") \ diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 6bc374c48c1e..14723c28025f 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -465,6 +466,7 @@ struct ContextSharedPart : boost::noncopyable GlobalOvercommitTracker global_overcommit_tracker; MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree) MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree) + ExportsList exports_list; /// The list of executing exports (for (Replicated)?MergeTree) ReplicatedFetchList replicated_fetch_list; RefreshSet refresh_set; /// The list of active refreshes (for MaterializedView) ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections. @@ -1158,6 +1160,8 @@ MergeList & Context::getMergeList() { return shared->merge_list; } const MergeList & Context::getMergeList() const { return shared->merge_list; } MovesList & Context::getMovesList() { return shared->moves_list; } const MovesList & Context::getMovesList() const { return shared->moves_list; } +ExportsList & Context::getExportsList() { return shared->exports_list; } +const ExportsList & Context::getExportsList() const { return shared->exports_list; } ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; } const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; } RefreshSet & Context::getRefreshSet() { return shared->refresh_set; } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 14c61ca2f176..297f12d58a7d 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -88,6 +88,7 @@ class AsynchronousMetrics; class BackgroundSchedulePool; class MergeList; class MovesList; +class ExportsList; class ReplicatedFetchList; class RefreshSet; class Cluster; @@ -1141,6 +1142,9 @@ class Context: public ContextData, public std::enable_shared_from_this MovesList & getMovesList(); const MovesList & getMovesList() const; + ExportsList & getExportsList(); + const ExportsList & getExportsList() const; + ReplicatedFetchList & getReplicatedFetchList(); const ReplicatedFetchList & getReplicatedFetchList() const; diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index c86c06ca64a3..e4658a2f4f02 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -444,21 +444,10 @@ class IStorage : public std::enable_shared_from_this, public TypePromo return false; } - struct ImportStats - { - ExecutionStatus status; - std::size_t elapsed_ns = 0; - std::size_t bytes_on_disk = 0; - std::size_t read_rows = 0; - std::size_t read_bytes = 0; - std::string file_path = ""; - }; - virtual SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - ContextPtr /* context */, - std::function /* stats_log */) + ContextPtr /* context */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName()); } diff --git a/src/Storages/MergeTree/ExportList.cpp b/src/Storages/MergeTree/ExportList.cpp new file mode 100644 index 000000000000..48f4b26a2f4a --- /dev/null +++ b/src/Storages/MergeTree/ExportList.cpp @@ -0,0 +1,44 @@ +#include + +namespace DB +{ + +ExportsListElement::ExportsListElement( + const StorageID & source_table_id_, + const StorageID & destination_table_id_, + UInt64 part_size_, + const String & part_name_, + UInt64 total_rows_to_read_, + UInt64 total_size_bytes_compressed_, + UInt64 total_size_bytes_uncompressed_, + time_t create_time_) +: source_table_id(source_table_id_) +, destination_table_id(destination_table_id_) +, part_size(part_size_) +, part_name(part_name_) +, total_rows_to_read(total_rows_to_read_) +, total_size_bytes_compressed(total_size_bytes_compressed_) +, total_size_bytes_uncompressed(total_size_bytes_uncompressed_) +, create_time(create_time_) +{ +} + +ExportInfo ExportsListElement::getInfo() const +{ + ExportInfo res; + res.source_database = source_table_id.database_name; + res.source_table = source_table_id.table_name; + res.destination_database = destination_table_id.database_name; + res.destination_table = destination_table_id.table_name; + res.part_name = part_name; + res.rows_read = rows_read; + res.total_rows_to_read = total_rows_to_read; + res.total_size_bytes_compressed = total_size_bytes_compressed; + res.total_size_bytes_uncompressed = total_size_bytes_uncompressed; + res.bytes_read_uncompressed = bytes_read_uncompressed; + res.create_time = create_time; + res.elapsed = elapsed; + return res; +} + +} diff --git a/src/Storages/MergeTree/ExportList.h b/src/Storages/MergeTree/ExportList.h new file mode 100644 index 000000000000..d0f1708ef6ea --- /dev/null +++ b/src/Storages/MergeTree/ExportList.h @@ -0,0 +1,75 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace CurrentMetrics +{ + extern const Metric Export; +} + +namespace DB +{ + +struct ExportInfo +{ + String source_database; + String source_table; + String destination_database; + String destination_table; + String part_name; + UInt64 rows_read; + UInt64 total_rows_to_read; + UInt64 total_size_bytes_compressed; + UInt64 total_size_bytes_uncompressed; + UInt64 bytes_read_uncompressed; + time_t create_time = 0; + Float64 elapsed; +}; + +struct ExportsListElement : private boost::noncopyable +{ + const StorageID source_table_id; + const StorageID destination_table_id; + const UInt64 part_size; + const String part_name; + UInt64 rows_read {0}; + UInt64 total_rows_to_read {0}; + UInt64 total_size_bytes_compressed {0}; + UInt64 total_size_bytes_uncompressed {0}; + UInt64 bytes_read_uncompressed {0}; + time_t create_time {0}; + Float64 elapsed {0}; + + Stopwatch watch; + + ExportsListElement( + const StorageID & source_table_id_, + const StorageID & destination_table_id_, + UInt64 part_size_, + const String & part_name_, + UInt64 total_rows_to_read_, + UInt64 total_size_bytes_compressed_, + UInt64 total_size_bytes_uncompressed_, + time_t create_time_); + + ExportInfo getInfo() const; +}; + + +class ExportsList final : public BackgroundProcessList +{ +private: + using Parent = BackgroundProcessList; + +public: + ExportsList() + : Parent(CurrentMetrics::Export) + {} +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 77604d5de286..b365397e1594 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include #include @@ -5947,23 +5948,15 @@ void MergeTreeData::exportPartToTableImpl( const MergeTreeExportManifest & manifest, ContextPtr local_context) { - std::function part_log_wrapper = [this, manifest](ImportStats stats) { - const auto & data_part = manifest.data_part; - - writePartLog( - PartLogElement::Type::EXPORT_PART, - stats.status, - stats.elapsed_ns, - data_part->name, - data_part, - {data_part}, - nullptr, - nullptr); - - std::lock_guard inner_lock(export_manifests_mutex); - - export_manifests.erase(manifest); - }; + auto exports_list_entry = getContext()->getExportsList().insert( + getStorageID(), + manifest.destination_storage_id, + manifest.data_part->getBytesOnDisk(), + manifest.data_part->name, + manifest.data_part->rows_count, + manifest.data_part->getBytesOnDisk(), + manifest.data_part->getBytesUncompressedOnDisk(), + manifest.create_time); auto metadata_snapshot = getInMemoryMetadataPtr(); Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); @@ -5994,8 +5987,7 @@ void MergeTreeData::exportPartToTableImpl( auto sink = destination_storage->import( manifest.data_part->name, block_with_partition_values, - local_context, - part_log_wrapper); + local_context); /// Most likely the file has already been imported, so we can just return if (!sink) @@ -6045,12 +6037,55 @@ void MergeTreeData::exportPartToTableImpl( QueryPlanOptimizationSettings optimization_settings(local_context); auto pipeline_settings = BuildQueryPipelineSettings(local_context); auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); + + builder->setProgressCallback([&exports_list_entry](const Progress & progress) + { + (*exports_list_entry)->bytes_read_uncompressed += progress.read_bytes; + (*exports_list_entry)->rows_read += progress.read_rows; + (*exports_list_entry)->elapsed = (*exports_list_entry)->watch.elapsedSeconds(); + }); + auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); pipeline.complete(sink); - CompletedPipelineExecutor exec(pipeline); - exec.execute(); + try + { + CompletedPipelineExecutor exec(pipeline); + exec.execute(); + + std::lock_guard inner_lock(export_manifests_mutex); + writePartLog( + PartLogElement::Type::EXPORT_PART, + {}, + static_cast((*exports_list_entry)->elapsed * 1000000000), + manifest.data_part->name, + manifest.data_part, + {manifest.data_part}, + nullptr, + nullptr); + + export_manifests.erase(manifest); + } + catch (const Exception &) + { + tryLogCurrentException(__PRETTY_FUNCTION__, "Exception is in export part task"); + + std::lock_guard inner_lock(export_manifests_mutex); + writePartLog( + PartLogElement::Type::EXPORT_PART, + ExecutionStatus::fromCurrentException("", true), + static_cast((*exports_list_entry)->elapsed * 1000000000), + manifest.data_part->name, + manifest.data_part, + {manifest.data_part}, + nullptr, + nullptr); + + export_manifests.erase(manifest); + + throw; + } } void MergeTreeData::movePartitionToShard(const ASTPtr & /*partition*/, bool /*move_part*/, const String & /*to*/, ContextPtr /*query_context*/) diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp deleted file mode 100644 index 82a1bbfc81ea..000000000000 --- a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.cpp +++ /dev/null @@ -1,65 +0,0 @@ - -#include - -namespace DB -{ - -StorageObjectStorageImporterSink::StorageObjectStorageImporterSink( - const std::string & path_, - const ObjectStoragePtr & object_storage_, - const ConfigurationPtr & configuration_, - const std::optional & format_settings_, - const Block & sample_block_, - const std::function & part_log_, - const ContextPtr & context_) - : SinkToStorage(sample_block_) - , object_storage(object_storage_) - , configuration(configuration_) - , format_settings(format_settings_) - , sample_block(sample_block_) - , context(context_) - , part_log(part_log_) -{ - stats.file_path = path_; - sink = std::make_shared( - stats.file_path, - object_storage, - configuration, - format_settings, - sample_block, - context); -} - -String StorageObjectStorageImporterSink::getName() const -{ - return "StorageObjectStorageMergeTreePartImporterSink"; -} - -void StorageObjectStorageImporterSink::consume(Chunk & chunk) -{ - sink->consume(chunk); - stats.read_bytes += chunk.bytes(); - stats.read_rows += chunk.getNumRows(); -} - -void StorageObjectStorageImporterSink::onFinish() -{ - sink->onFinish(); - - if (const auto object_metadata = object_storage->tryGetObjectMetadata(stats.file_path)) - { - stats.bytes_on_disk = object_metadata->size_bytes; - } - - part_log(stats); -} - -void StorageObjectStorageImporterSink::onException(std::exception_ptr exception) -{ - sink->onException(exception); - - stats.status = ExecutionStatus(-1, "Error importing part"); - part_log(stats); -} - -} diff --git a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h b/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h deleted file mode 100644 index 051f5196f964..000000000000 --- a/src/Storages/ObjectStorage/MergeTree/StorageObjectStorageImporterSink.h +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ - -using DataPartPtr = std::shared_ptr; - -/* - * Wrapper around `StorageObjectsStorageSink` that takes care of accounting & metrics for partition export - */ -class StorageObjectStorageImporterSink : public SinkToStorage -{ -public: - using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr; - - StorageObjectStorageImporterSink( - const std::string & path_, - const ObjectStoragePtr & object_storage_, - const ConfigurationPtr & configuration_, - const std::optional & format_settings_, - const Block & sample_block_, - const std::function & part_log_, - const ContextPtr & context_); - - String getName() const override; - - void consume(Chunk & chunk) override; - - void onFinish() override; - - void onException(std::exception_ptr exception) override; - -private: - std::shared_ptr sink; - ObjectStoragePtr object_storage; - ConfigurationPtr configuration; - std::optional format_settings; - Block sample_block; - ContextPtr context; - std::function part_log; - - IStorage::ImportStats stats; -}; - -} diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 1b0d322840fd..16a105d86142 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -454,8 +453,7 @@ bool StorageObjectStorage::supportsImport() const SinkToStoragePtr StorageObjectStorage::import( const std::string & file_name, Block & block_with_partition_values, - ContextPtr local_context, - std::function part_log) + ContextPtr local_context) { std::string partition_key; @@ -476,16 +474,14 @@ SinkToStoragePtr StorageObjectStorage::import( LOG_INFO(getLogger("StorageObjectStorage"), "File {} already exists, skipping import", file_path); return nullptr; } - - return std::make_shared( + + return std::make_shared( file_path, object_storage, configuration, format_settings, getInMemoryMetadataPtr()->getSampleBlock(), - part_log, - local_context - ); + local_context); } void StorageObjectStorage::truncate( diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 14c83e722948..c5f4ae7c96d4 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -106,8 +106,7 @@ class StorageObjectStorage : public IStorage SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - ContextPtr /* context */, - std::function /* part_log */) override; + ContextPtr /* context */) override; void truncate( const ASTPtr & query, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index d5f0ce7e1330..5c726bc7160e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -612,13 +612,12 @@ bool StorageObjectStorageCluster::supportsImport() const SinkToStoragePtr StorageObjectStorageCluster::import( const std::string & file_name, Block & block_with_partition_values, - ContextPtr context, - std::function part_log) + ContextPtr context) { if (pure_storage) - return pure_storage->import(file_name, block_with_partition_values, context, part_log); + return pure_storage->import(file_name, block_with_partition_values, context); - return IStorageCluster::import(file_name, block_with_partition_values, context, part_log); + return IStorageCluster::import(file_name, block_with_partition_values, context); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 41d7e7e7d867..0c07aff7cb59 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -63,8 +63,7 @@ class StorageObjectStorageCluster : public IStorageCluster SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - ContextPtr /* context */, - std::function /* part_log */) override; + ContextPtr /* context */) override; private: void updateQueryToSendIfNeeded( diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp index 8a25b4188426..f39b46dd3749 100644 --- a/src/Storages/System/StorageSystemExports.cpp +++ b/src/Storages/System/StorageSystemExports.cpp @@ -1,11 +1,9 @@ +#include #include #include -#include -#include -#include "Columns/ColumnString.h" -#include "DataTypes/DataTypeString.h" -#include -#include "Storages/VirtualColumnUtils.h" +#include +#include +#include #include #include #include @@ -23,95 +21,39 @@ ColumnsDescription StorageSystemExports::getColumnsDescription() {"destination_database", std::make_shared(), "Name of the destination database."}, {"destination_table", std::make_shared(), "Name of the destination table."}, {"create_time", std::make_shared(), "Date and time when the export command was submitted for execution."}, - {"part_name", std::make_shared(), "Name of the part"} + {"part_name", std::make_shared(), "Name of the part"}, + {"elapsed", std::make_shared(), "The time elapsed (in seconds) since the export started."}, + {"rows_read", std::make_shared(), "The number of rows read from the exported part."}, + {"total_rows_to_read", std::make_shared(), "The total number of rows to read from the exported part."}, + {"total_size_bytes_compressed", std::make_shared(), "The total size of the compressed data in the exported part."}, + {"total_size_bytes_uncompressed", std::make_shared(), "The total size of the uncompressed data in the exported part."}, + {"bytes_read_uncompressed", std::make_shared(), "The number of uncompressed bytes read from the exported part."}, }; } -void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector) const +void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node *, std::vector) const { const auto access = context->getAccess(); - const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES); - - /// Collect a set of *MergeTree tables. - std::map> merge_tree_tables; - for (const auto & db : DatabaseCatalog::instance().getDatabases()) - { - /// Check if database can contain MergeTree tables - if (!db.second->canContainMergeTreeTables()) - continue; - - const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first); - - for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) - { - const auto & table = iterator->table(); - if (!table) - continue; + const bool check_access_for_tables = !access->isGranted(AccessType::SHOW_TABLES); - if (!dynamic_cast(table.get())) - continue; - - if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name())) - continue; - - merge_tree_tables[db.first][iterator->name()] = table; - } - } - - MutableColumnPtr col_source_database_export = ColumnString::create(); - MutableColumnPtr col_source_table_export = ColumnString::create(); - - for (auto & db : merge_tree_tables) + for (const auto & export_info : context->getExportsList().get()) { - for (auto & table : db.second) - { - col_source_database_export->insert(db.first); - col_source_table_export->insert(table.first); - } - } - - ColumnPtr col_source_database = std::move(col_source_database_export); - ColumnPtr col_source_table = std::move(col_source_table_export); - - /// Determine what tables are needed by the conditions in the query. - { - Block filtered_block - { - { col_source_database, std::make_shared(), "source_database" }, - { col_source_table, std::make_shared(), "source_table" }, - }; - - VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context); - - if (!filtered_block.rows()) - return; - - col_source_database = filtered_block.getByName("source_database").column; - col_source_table = filtered_block.getByName("source_table").column; - } - - for (size_t i_storage = 0; i_storage < col_source_database->size(); ++i_storage) - { - auto database = (*col_source_database)[i_storage].safeGet(); - auto table = (*col_source_table)[i_storage].safeGet(); - - std::vector statuses; - { - const IStorage * storage = merge_tree_tables[database][table].get(); - if (const auto * merge_tree = dynamic_cast(storage)) - statuses = merge_tree->getExportsStatus(); - } + if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, export_info.source_database, export_info.source_table)) + continue; - for (const MergeTreeExportStatus & status : statuses) - { - size_t col_num = 0; - res_columns[col_num++]->insert(status.source_database); - res_columns[col_num++]->insert(status.source_table); - res_columns[col_num++]->insert(status.destination_database); - res_columns[col_num++]->insert(status.destination_table); - res_columns[col_num++]->insert(status.create_time); - res_columns[col_num++]->insert(status.part_name); - } + size_t i = 0; + res_columns[i++]->insert(export_info.source_database); + res_columns[i++]->insert(export_info.source_table); + res_columns[i++]->insert(export_info.destination_database); + res_columns[i++]->insert(export_info.destination_table); + res_columns[i++]->insert(export_info.create_time); + res_columns[i++]->insert(export_info.part_name); + res_columns[i++]->insert(export_info.elapsed); + res_columns[i++]->insert(export_info.rows_read); + res_columns[i++]->insert(export_info.total_rows_to_read); + res_columns[i++]->insert(export_info.total_size_bytes_compressed); + res_columns[i++]->insert(export_info.total_size_bytes_uncompressed); + res_columns[i++]->insert(export_info.bytes_read_uncompressed); } } diff --git a/src/Storages/System/StorageSystemMerges.cpp b/src/Storages/System/StorageSystemMerges.cpp index 0fca5dc84a2b..c8c569ff4696 100644 --- a/src/Storages/System/StorageSystemMerges.cpp +++ b/src/Storages/System/StorageSystemMerges.cpp @@ -1,5 +1,5 @@ -#include #include +#include #include #include diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp index ad07a4231260..a0f707b70a90 100644 --- a/src/Storages/System/attachSystemTables.cpp +++ b/src/Storages/System/attachSystemTables.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -207,6 +208,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b attach(context, system_database, "histogram_metrics", "Contains histogram metrics which can be calculated instantly and exported in the Prometheus format. For example, the keeper response time. This table is always up to date."); attach(context, system_database, "merges", "Contains a list of merges currently executing merges of MergeTree tables and their progress. Each merge operation is represented by a single row."); attach(context, system_database, "moves", "Contains information about in-progress data part moves of MergeTree tables. Each data part movement is represented by a single row."); + attach(context, system_database, "exports", "Contains a list of exports currently executing exports of MergeTree tables and their progress. Each export operation is represented by a single row."); attach(context, system_database, "mutations", "Contains a list of mutations and their progress. Each mutation command is represented by a single row."); attachNoDescription(context, system_database, "replicas", "Contains information and status of all table replicas on current server. Each replica is represented by a single row."); attach(context, system_database, "replication_queue", "Contains information about tasks from replication queues stored in ClickHouse Keeper, or ZooKeeper, for each table replica."); From 369805546f045e3f4d953d9083371571e24477be Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 16 Sep 2025 13:37:51 -0300 Subject: [PATCH 02/12] add docs to the import api --- src/Storages/IStorage.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index e4658a2f4f02..fad47f7cac37 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -444,6 +444,11 @@ class IStorage : public std::enable_shared_from_this, public TypePromo return false; } + /* + It is kind of hard to describe this API. It is currently only implemented in StorageObjectStorage. + It is meant to be used to import merge tree data parts into object storage. It is similar to the write API, + but it won't re-partition the data and should allow the filename to be set by the caller. + */ virtual SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, From 9d9dbe8745cf397b8eb572e6b2bda82f40aca2be Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 16 Sep 2025 15:52:34 -0300 Subject: [PATCH 03/12] add memory consumption tracking --- src/Storages/MergeTree/ExportList.cpp | 16 ++++++++++++- src/Storages/MergeTree/ExportList.h | 12 +++++++++- src/Storages/MergeTree/MergeTreeData.cpp | 25 ++++++++++++++++---- src/Storages/MergeTree/MergeTreeData.h | 4 +++- src/Storages/System/StorageSystemExports.cpp | 4 ++++ 5 files changed, 54 insertions(+), 7 deletions(-) diff --git a/src/Storages/MergeTree/ExportList.cpp b/src/Storages/MergeTree/ExportList.cpp index 48f4b26a2f4a..d726f6beee6e 100644 --- a/src/Storages/MergeTree/ExportList.cpp +++ b/src/Storages/MergeTree/ExportList.cpp @@ -11,7 +11,8 @@ ExportsListElement::ExportsListElement( UInt64 total_rows_to_read_, UInt64 total_size_bytes_compressed_, UInt64 total_size_bytes_uncompressed_, - time_t create_time_) + time_t create_time_, + const ContextPtr & context) : source_table_id(source_table_id_) , destination_table_id(destination_table_id_) , part_size(part_size_) @@ -21,6 +22,7 @@ ExportsListElement::ExportsListElement( , total_size_bytes_uncompressed(total_size_bytes_uncompressed_) , create_time(create_time_) { + thread_group = ThreadGroup::createForBackgroundProcess(context); } ExportInfo ExportsListElement::getInfo() const @@ -36,9 +38,21 @@ ExportInfo ExportsListElement::getInfo() const res.total_size_bytes_compressed = total_size_bytes_compressed; res.total_size_bytes_uncompressed = total_size_bytes_uncompressed; res.bytes_read_uncompressed = bytes_read_uncompressed; + res.memory_usage = getMemoryUsage(); + res.peak_memory_usage = getPeakMemoryUsage(); res.create_time = create_time; res.elapsed = elapsed; return res; } +UInt64 ExportsListElement::getMemoryUsage() const +{ + return thread_group->memory_tracker.get(); +} + +UInt64 ExportsListElement::getPeakMemoryUsage() const +{ + return thread_group->memory_tracker.getPeak(); +} + } diff --git a/src/Storages/MergeTree/ExportList.h b/src/Storages/MergeTree/ExportList.h index d0f1708ef6ea..6e2f52360092 100644 --- a/src/Storages/MergeTree/ExportList.h +++ b/src/Storages/MergeTree/ExportList.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -27,6 +28,8 @@ struct ExportInfo UInt64 total_size_bytes_compressed; UInt64 total_size_bytes_uncompressed; UInt64 bytes_read_uncompressed; + UInt64 memory_usage; + UInt64 peak_memory_usage; time_t create_time = 0; Float64 elapsed; }; @@ -46,6 +49,7 @@ struct ExportsListElement : private boost::noncopyable Float64 elapsed {0}; Stopwatch watch; + ThreadGroupPtr thread_group; ExportsListElement( const StorageID & source_table_id_, @@ -55,9 +59,13 @@ struct ExportsListElement : private boost::noncopyable UInt64 total_rows_to_read_, UInt64 total_size_bytes_compressed_, UInt64 total_size_bytes_uncompressed_, - time_t create_time_); + time_t create_time_, + const ContextPtr & context); ExportInfo getInfo() const; + + UInt64 getMemoryUsage() const; + UInt64 getPeakMemoryUsage() const; }; @@ -72,4 +80,6 @@ class ExportsList final : public BackgroundProcessList; + } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index b365397e1594..479915de8e49 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -5956,7 +5956,15 @@ void MergeTreeData::exportPartToTableImpl( manifest.data_part->rows_count, manifest.data_part->getBytesOnDisk(), manifest.data_part->getBytesUncompressedOnDisk(), - manifest.create_time); + manifest.create_time, + local_context); + + // Switch to the export's thread group for memory tracking + std::optional switcher; + if (exports_list_entry) + { + switcher.emplace((*exports_list_entry)->thread_group, "", /*allow_existing_group*/ true); + } auto metadata_snapshot = getInMemoryMetadataPtr(); Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); @@ -6063,7 +6071,8 @@ void MergeTreeData::exportPartToTableImpl( manifest.data_part, {manifest.data_part}, nullptr, - nullptr); + nullptr, + exports_list_entry.get()); export_manifests.erase(manifest); } @@ -6080,7 +6089,8 @@ void MergeTreeData::exportPartToTableImpl( manifest.data_part, {manifest.data_part}, nullptr, - nullptr); + nullptr, + exports_list_entry.get()); export_manifests.erase(manifest); @@ -8672,7 +8682,8 @@ void MergeTreeData::writePartLog( const DataPartPtr & result_part, const DataPartsVector & source_parts, const MergeListEntry * merge_entry, - std::shared_ptr profile_counters) + std::shared_ptr profile_counters, + const ExportsListEntry * exports_entry) try { auto table_id = getStorageID(); @@ -8740,6 +8751,12 @@ try part_log_elem.rows = (*merge_entry)->rows_written; part_log_elem.peak_memory_usage = (*merge_entry)->getMemoryTracker().getPeak(); } + else if (exports_entry) + { + part_log_elem.rows_read = (*exports_entry)->rows_read; + part_log_elem.bytes_read_uncompressed = (*exports_entry)->bytes_read_uncompressed; + part_log_elem.peak_memory_usage = (*exports_entry)->getMemoryTracker().getPeak(); + } if (profile_counters) { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 7e148a7d0539..b9007af0e6f0 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -1509,7 +1510,8 @@ class MergeTreeData : public IStorage, public WithMutableContext const DataPartPtr & result_part, const DataPartsVector & source_parts, const MergeListEntry * merge_entry, - std::shared_ptr profile_counters); + std::shared_ptr profile_counters, + const ExportsListEntry * exports_entry = nullptr); /// If part is assigned to merge or mutation (possibly replicated) /// Should be overridden by children, because they can have different diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp index f39b46dd3749..6cb3befb4177 100644 --- a/src/Storages/System/StorageSystemExports.cpp +++ b/src/Storages/System/StorageSystemExports.cpp @@ -28,6 +28,8 @@ ColumnsDescription StorageSystemExports::getColumnsDescription() {"total_size_bytes_compressed", std::make_shared(), "The total size of the compressed data in the exported part."}, {"total_size_bytes_uncompressed", std::make_shared(), "The total size of the uncompressed data in the exported part."}, {"bytes_read_uncompressed", std::make_shared(), "The number of uncompressed bytes read from the exported part."}, + {"memory_usage", std::make_shared(), "Current memory usage in bytes for the export operation."}, + {"peak_memory_usage", std::make_shared(), "Peak memory usage in bytes during the export operation."}, }; } @@ -54,6 +56,8 @@ void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr con res_columns[i++]->insert(export_info.total_size_bytes_compressed); res_columns[i++]->insert(export_info.total_size_bytes_uncompressed); res_columns[i++]->insert(export_info.bytes_read_uncompressed); + res_columns[i++]->insert(export_info.memory_usage); + res_columns[i++]->insert(export_info.peak_memory_usage); } } From e6f3ab7d2f97c60422595f2531b7f32af751636e Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 16 Sep 2025 16:22:40 -0300 Subject: [PATCH 04/12] fix --- src/Storages/MergeTree/MergeTreeData.cpp | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 479915de8e49..971827ed8827 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -5959,12 +5959,7 @@ void MergeTreeData::exportPartToTableImpl( manifest.create_time, local_context); - // Switch to the export's thread group for memory tracking - std::optional switcher; - if (exports_list_entry) - { - switcher.emplace((*exports_list_entry)->thread_group, "", /*allow_existing_group*/ true); - } + ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ""); auto metadata_snapshot = getInMemoryMetadataPtr(); Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); @@ -8755,7 +8750,7 @@ try { part_log_elem.rows_read = (*exports_entry)->rows_read; part_log_elem.bytes_read_uncompressed = (*exports_entry)->bytes_read_uncompressed; - part_log_elem.peak_memory_usage = (*exports_entry)->getMemoryTracker().getPeak(); + part_log_elem.peak_memory_usage = (*exports_entry)->getPeakMemoryUsage(); } if (profile_counters) From 8d3f65b9db6670c779807b81cff20465466f6f2f Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 17 Sep 2025 15:44:50 -0300 Subject: [PATCH 05/12] Update src/Storages/IStorage.h Co-authored-by: filimonov <1549571+filimonov@users.noreply.github.com> --- src/Storages/IStorage.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index fad47f7cac37..f594d034797d 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -445,7 +445,7 @@ class IStorage : public std::enable_shared_from_this, public TypePromo } /* - It is kind of hard to describe this API. It is currently only implemented in StorageObjectStorage. +It is currently only implemented in StorageObjectStorage. It is meant to be used to import merge tree data parts into object storage. It is similar to the write API, but it won't re-partition the data and should allow the filename to be set by the caller. */ From 2ac019c297bbc0f0439333052442792c8b95e824 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 17 Sep 2025 22:45:59 -0300 Subject: [PATCH 06/12] add profile events and persist some settings in the manifest --- src/Common/ProfileEvents.cpp | 4 + src/Core/Settings.cpp | 3 + src/Storages/IStorage.h | 8 +- src/Storages/MergeTree/ExportList.cpp | 3 + src/Storages/MergeTree/ExportList.h | 3 + src/Storages/MergeTree/MergeTreeData.cpp | 80 +++++++++++++------ .../MergeTree/MergeTreeExportManifest.h | 17 +++- .../ObjectStorage/StorageObjectStorage.cpp | 12 +-- .../ObjectStorage/StorageObjectStorage.h | 2 + .../StorageObjectStorageCluster.cpp | 6 +- .../StorageObjectStorageCluster.h | 2 + src/Storages/System/StorageSystemExports.cpp | 2 + 12 files changed, 108 insertions(+), 34 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 14cadc3b3587..91ce75d3a32f 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -28,6 +28,10 @@ M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.", ValueType::Bytes) \ M(AsyncInsertRows, "Number of rows inserted by asynchronous INSERT queries.", ValueType::Number) \ M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.", ValueType::Number) \ + M(PartsExports, "Number of successful part exports.", ValueType::Number) \ + M(PartsExportFailures, "Number of failed part exports.", ValueType::Number) \ + M(PartsExportDuplicated, "Number of part exports that failed because target already exists.", ValueType::Number) \ + M(PartsExportTotalMilliseconds, "Total time spent on part export operations.", ValueType::Milliseconds) \ M(FailedQuery, "Number of failed queries.", ValueType::Number) \ M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.", ValueType::Number) \ M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.", ValueType::Number) \ diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 1d3ffebafe75..dd09207d76f0 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6687,6 +6687,9 @@ Allows to change the behaviour of the result type of `dateTrunc` function. Possible values: - 0 - When the second argument is `DateTime64/Date32` the return type will be `DateTime64/Date32` regardless of the time unit in the first argument. - 1 - For `Date32` the result is always `Date`. For `DateTime64` the result is `DateTime` for time units `second` and higher. +)", 0) \ + DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"( +Overwrite file if it already exists when exporting a merge tree part )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index f594d034797d..19a30d952c8f 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -452,10 +452,12 @@ It is currently only implemented in StorageObjectStorage. virtual SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, ContextPtr /* context */) - { - throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName()); - } + { + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName()); + } /** Writes the data to a table in distributed manner. diff --git a/src/Storages/MergeTree/ExportList.cpp b/src/Storages/MergeTree/ExportList.cpp index d726f6beee6e..bb3390828150 100644 --- a/src/Storages/MergeTree/ExportList.cpp +++ b/src/Storages/MergeTree/ExportList.cpp @@ -8,6 +8,7 @@ ExportsListElement::ExportsListElement( const StorageID & destination_table_id_, UInt64 part_size_, const String & part_name_, + const String & target_file_name_, UInt64 total_rows_to_read_, UInt64 total_size_bytes_compressed_, UInt64 total_size_bytes_uncompressed_, @@ -17,6 +18,7 @@ ExportsListElement::ExportsListElement( , destination_table_id(destination_table_id_) , part_size(part_size_) , part_name(part_name_) +, destination_file_path(target_file_name_) , total_rows_to_read(total_rows_to_read_) , total_size_bytes_compressed(total_size_bytes_compressed_) , total_size_bytes_uncompressed(total_size_bytes_uncompressed_) @@ -33,6 +35,7 @@ ExportInfo ExportsListElement::getInfo() const res.destination_database = destination_table_id.database_name; res.destination_table = destination_table_id.table_name; res.part_name = part_name; + res.destination_file_path = destination_file_path; res.rows_read = rows_read; res.total_rows_to_read = total_rows_to_read; res.total_size_bytes_compressed = total_size_bytes_compressed; diff --git a/src/Storages/MergeTree/ExportList.h b/src/Storages/MergeTree/ExportList.h index 6e2f52360092..f8de5a9eb1b0 100644 --- a/src/Storages/MergeTree/ExportList.h +++ b/src/Storages/MergeTree/ExportList.h @@ -23,6 +23,7 @@ struct ExportInfo String destination_database; String destination_table; String part_name; + String destination_file_path; UInt64 rows_read; UInt64 total_rows_to_read; UInt64 total_size_bytes_compressed; @@ -40,6 +41,7 @@ struct ExportsListElement : private boost::noncopyable const StorageID destination_table_id; const UInt64 part_size; const String part_name; + const String destination_file_path; UInt64 rows_read {0}; UInt64 total_rows_to_read {0}; UInt64 total_size_bytes_compressed {0}; @@ -56,6 +58,7 @@ struct ExportsListElement : private boost::noncopyable const StorageID & destination_table_id_, UInt64 part_size_, const String & part_name_, + const String & destination_file_path_, UInt64 total_rows_to_read_, UInt64 total_size_bytes_compressed_, UInt64 total_size_bytes_uncompressed_, diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 971827ed8827..8d0739ef7fa1 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -113,6 +113,7 @@ #include #include #include +#include #include #include @@ -154,6 +155,10 @@ namespace ProfileEvents extern const Event LoadedDataPartsMicroseconds; extern const Event RestorePartsSkippedFiles; extern const Event RestorePartsSkippedBytes; + extern const Event PartsExports; + extern const Event PartsExportTotalMilliseconds; + extern const Event PartsExportFailures; + extern const Event PartsExportDuplicated; } namespace CurrentMetrics @@ -198,6 +203,8 @@ namespace Setting extern const SettingsUInt64 merge_tree_storage_snapshot_sleep_ms; extern const SettingsBool allow_experimental_export_merge_tree_part; extern const SettingsUInt64 min_bytes_to_use_direct_io; + extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists; + extern const SettingsBool output_format_parallel_formatting; } namespace MergeTreeSetting @@ -310,6 +317,7 @@ namespace ErrorCodes extern const int CANNOT_FORGET_PARTITION; extern const int DATA_TYPE_CANNOT_BE_USED_IN_KEY; extern const int UNKNOWN_TABLE; + extern const int FILE_ALREADY_EXISTS; } static void checkSuspiciousIndices(const ASTFunction * index_function) @@ -5932,9 +5940,15 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP part_name, getStorageID().getFullTableName()); { + MergeTreeExportManifest manifest( + dest_storage->getStorageID(), + part, + query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], + query_context->getSettingsRef()[Setting::output_format_parallel_formatting]); + std::lock_guard lock(export_manifests_mutex); - if (!export_manifests.emplace(dest_storage->getStorageID(), part).second) + if (!export_manifests.emplace(std::move(manifest)).second) { throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'", part_name, dest_storage->getStorageID().getFullTableName()); @@ -5948,19 +5962,6 @@ void MergeTreeData::exportPartToTableImpl( const MergeTreeExportManifest & manifest, ContextPtr local_context) { - auto exports_list_entry = getContext()->getExportsList().insert( - getStorageID(), - manifest.destination_storage_id, - manifest.data_part->getBytesOnDisk(), - manifest.data_part->name, - manifest.data_part->rows_count, - manifest.data_part->getBytesOnDisk(), - manifest.data_part->getBytesUncompressedOnDisk(), - manifest.create_time, - local_context); - - ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ""); - auto metadata_snapshot = getInMemoryMetadataPtr(); Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); StorageSnapshotPtr storage_snapshot = getStorageSnapshot(metadata_snapshot, local_context); @@ -5987,16 +5988,29 @@ void MergeTreeData::exportPartToTableImpl( throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name); } - auto sink = destination_storage->import( - manifest.data_part->name, - block_with_partition_values, - local_context); + SinkToStoragePtr sink; + std::string destination_file_path; - /// Most likely the file has already been imported, so we can just return - if (!sink) + try { - std::lock_guard inner_lock(export_manifests_mutex); + auto context_copy = Context::createCopy(local_context); + context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); + sink = destination_storage->import( + manifest.data_part->name, + block_with_partition_values, + destination_file_path, + manifest.overwrite_file_if_exists, + context_copy); + } + catch (const Exception & e) + { + if (e.code() == ErrorCodes::FILE_ALREADY_EXISTS) + { + ProfileEvents::increment(ProfileEvents::PartsExportDuplicated); + } + + std::lock_guard inner_lock(export_manifests_mutex); export_manifests.erase(manifest); return; } @@ -6037,6 +6051,20 @@ void MergeTreeData::exportPartToTableImpl( local_context, getLogger("ExportPartition")); + auto exports_list_entry = getContext()->getExportsList().insert( + getStorageID(), + manifest.destination_storage_id, + manifest.data_part->getBytesOnDisk(), + manifest.data_part->name, + destination_file_path, + manifest.data_part->rows_count, + manifest.data_part->getBytesOnDisk(), + manifest.data_part->getBytesUncompressedOnDisk(), + manifest.create_time, + local_context); + + ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ""); + QueryPlanOptimizationSettings optimization_settings(local_context); auto pipeline_settings = BuildQueryPipelineSettings(local_context); auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); @@ -6070,10 +6098,15 @@ void MergeTreeData::exportPartToTableImpl( exports_list_entry.get()); export_manifests.erase(manifest); + + ProfileEvents::increment(ProfileEvents::PartsExports); + ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast((*exports_list_entry)->elapsed * 1000)); } - catch (const Exception &) + catch (...) { - tryLogCurrentException(__PRETTY_FUNCTION__, "Exception is in export part task"); + tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while exporting the part {}. User should retry.", manifest.data_part->name)); + + ProfileEvents::increment(ProfileEvents::PartsExportFailures); std::lock_guard inner_lock(export_manifests_mutex); writePartLog( @@ -8751,6 +8784,7 @@ try part_log_elem.rows_read = (*exports_entry)->rows_read; part_log_elem.bytes_read_uncompressed = (*exports_entry)->bytes_read_uncompressed; part_log_elem.peak_memory_usage = (*exports_entry)->getPeakMemoryUsage(); + part_log_elem.path_on_disk = (*exports_entry)->destination_file_path; } if (profile_counters) diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h index 89aed701f993..36831fd132ba 100644 --- a/src/Storages/MergeTree/MergeTreeExportManifest.h +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -8,9 +8,24 @@ struct MergeTreeExportManifest { using DataPartPtr = std::shared_ptr; + + MergeTreeExportManifest( + const StorageID & destination_storage_id_, + const DataPartPtr & data_part_, + bool overwrite_file_if_exists_, + bool parallel_formatting_) + : destination_storage_id(destination_storage_id_), + data_part(data_part_), + overwrite_file_if_exists(overwrite_file_if_exists_), + parallel_formatting(parallel_formatting_), + create_time(time(nullptr)) {} + StorageID destination_storage_id; DataPartPtr data_part; - time_t create_time = time(nullptr); + bool overwrite_file_if_exists; + bool parallel_formatting; + + time_t create_time; mutable bool in_progress = false; bool operator<(const MergeTreeExportManifest & rhs) const diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 16a105d86142..d256fa36f00e 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -49,6 +49,7 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; extern const int LOGICAL_ERROR; extern const int INCORRECT_DATA; + extern const int FILE_ALREADY_EXISTS; } String StorageObjectStorage::getPathSample(ContextPtr context) @@ -453,6 +454,8 @@ bool StorageObjectStorage::supportsImport() const SinkToStoragePtr StorageObjectStorage::import( const std::string & file_name, Block & block_with_partition_values, + std::string & destination_file_path, + bool overwrite_if_exists, ContextPtr local_context) { std::string partition_key; @@ -467,16 +470,15 @@ SinkToStoragePtr StorageObjectStorage::import( } } - const auto file_path = configuration->getPathForWrite(partition_key, file_name).path; + destination_file_path = configuration->getPathForWrite(partition_key, file_name).path; - if (object_storage->exists(StoredObject(file_path))) + if (!overwrite_if_exists && object_storage->exists(StoredObject(destination_file_path))) { - LOG_INFO(getLogger("StorageObjectStorage"), "File {} already exists, skipping import", file_path); - return nullptr; + throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "File {} already exists", destination_file_path); } return std::make_shared( - file_path, + destination_file_path, object_storage, configuration, format_settings, diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index c5f4ae7c96d4..f12c6bd6077f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -106,6 +106,8 @@ class StorageObjectStorage : public IStorage SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, ContextPtr /* context */) override; void truncate( diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 5c726bc7160e..03fcd9962c4a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -612,12 +612,14 @@ bool StorageObjectStorageCluster::supportsImport() const SinkToStoragePtr StorageObjectStorageCluster::import( const std::string & file_name, Block & block_with_partition_values, + std::string & destination_file_path, + bool overwrite_if_exists, ContextPtr context) { if (pure_storage) - return pure_storage->import(file_name, block_with_partition_values, context); + return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context); - return IStorageCluster::import(file_name, block_with_partition_values, context); + return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, context); } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 0c07aff7cb59..374d35877048 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -63,6 +63,8 @@ class StorageObjectStorageCluster : public IStorageCluster SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, + std::string & /* destination_file_path */, + bool /* overwrite_if_exists */, ContextPtr /* context */) override; private: diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp index 6cb3befb4177..979fa21708d6 100644 --- a/src/Storages/System/StorageSystemExports.cpp +++ b/src/Storages/System/StorageSystemExports.cpp @@ -22,6 +22,7 @@ ColumnsDescription StorageSystemExports::getColumnsDescription() {"destination_table", std::make_shared(), "Name of the destination table."}, {"create_time", std::make_shared(), "Date and time when the export command was submitted for execution."}, {"part_name", std::make_shared(), "Name of the part"}, + {"destination_file_path", std::make_shared(), "File path where the part is being exported."}, {"elapsed", std::make_shared(), "The time elapsed (in seconds) since the export started."}, {"rows_read", std::make_shared(), "The number of rows read from the exported part."}, {"total_rows_to_read", std::make_shared(), "The total number of rows to read from the exported part."}, @@ -50,6 +51,7 @@ void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr con res_columns[i++]->insert(export_info.destination_table); res_columns[i++]->insert(export_info.create_time); res_columns[i++]->insert(export_info.part_name); + res_columns[i++]->insert(export_info.destination_file_path); res_columns[i++]->insert(export_info.elapsed); res_columns[i++]->insert(export_info.rows_read); res_columns[i++]->insert(export_info.total_rows_to_read); From bab19e3e73393bad89aed228758bcc951315c20c Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 18 Sep 2025 06:07:00 -0300 Subject: [PATCH 07/12] fix settingshistory --- src/Core/SettingsChangesHistory.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index d62fdc92bf5d..30fbb412f019 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -78,6 +78,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_max_nodes", 0, 0, "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, + {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6", { From e8c2076bfdccb6593b8dc87688f569f18c8bc984 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 18 Sep 2025 08:44:57 -0300 Subject: [PATCH 08/12] store checksum of the part --- src/Storages/MergeTree/MergeTreeData.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 8d0739ef7fa1..0c1bc901c763 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -5997,7 +5997,7 @@ void MergeTreeData::exportPartToTableImpl( context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); sink = destination_storage->import( - manifest.data_part->name, + manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), block_with_partition_values, destination_file_path, manifest.overwrite_file_if_exists, From 7f7085b6bec5a1e42ed7aedb4b59745837a75510 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 18 Sep 2025 13:30:21 -0300 Subject: [PATCH 09/12] adapt tests --- ...t_merge_tree_part_to_object_storage.reference | 16 ++++++++-------- ...2_export_merge_tree_part_to_object_storage.sh | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference index 75c32111f7a3..9f5316c2623d 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference @@ -1,14 +1,14 @@ ---- Export 2020_1_1_0 and 2021_2_2_0 ---- Both data parts should appear -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 1 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 2 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 3 -test/s3_table_NAME/year=2021/2021_2_2_0.parquet 4 +1 2020 +2 2020 +3 2020 +4 2021 ---- Export the same part again, it should be idempotent -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 1 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 2 -test/s3_table_NAME/year=2020/2020_1_1_0.parquet 3 -test/s3_table_NAME/year=2021/2021_2_2_0.parquet 4 +1 2020 +2 2020 +3 2020 +4 2021 ---- Data in roundtrip MergeTree table (should match s3_table) 1 2020 2 2020 diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh index 5e886eaf4755..7b3e9d3bb3d1 100755 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh @@ -23,16 +23,16 @@ query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTING query "ALTER TABLE $mt_table EXPORT PART '2021_2_2_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" echo "---- Both data parts should appear" -query "SELECT DISTINCT ON (id) replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), id FROM $s3_table ORDER BY id" +query "SELECT * FROM $s3_table ORDER BY id" echo "---- Export the same part again, it should be idempotent" query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" -query "SELECT DISTINCT ON (id) replaceRegexpAll(_path, '$s3_table', 's3_table_NAME'), id FROM $s3_table ORDER BY id" +query "SELECT * FROM $s3_table ORDER BY id" query "CREATE TABLE $mt_table_roundtrip ENGINE = MergeTree() PARTITION BY year ORDER BY tuple() AS SELECT * FROM $s3_table" echo "---- Data in roundtrip MergeTree table (should match s3_table)" -query "SELECT DISTINCT ON (id) * FROM $mt_table_roundtrip ORDER BY id" +query "SELECT * FROM $s3_table ORDER BY id" query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip" From cf638d31061831462debf051dcf71c97adbb042d Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 18 Sep 2025 14:56:58 -0300 Subject: [PATCH 10/12] adapt tests --- ...t_merge_tree_part_to_object_storage.reference | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference index 9f5316c2623d..d9089d37dd99 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference @@ -1,14 +1,14 @@ ---- Export 2020_1_1_0 and 2021_2_2_0 ---- Both data parts should appear -1 2020 -2 2020 -3 2020 -4 2021 +1 2020 +2 2020 +3 2020 +4 2021 ---- Export the same part again, it should be idempotent -1 2020 -2 2020 -3 2020 -4 2021 +1 2020 +2 2020 +3 2020 +4 2021 ---- Data in roundtrip MergeTree table (should match s3_table) 1 2020 2 2020 From 947f7d74002c994c2653f09edc2d3d50df77d767 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 25 Sep 2025 10:13:58 -0300 Subject: [PATCH 11/12] add adjustOnBackgroundTaskEnd for export --- src/Storages/MergeTree/ExportList.cpp | 5 +++++ src/Storages/MergeTree/ExportList.h | 2 ++ 2 files changed, 7 insertions(+) diff --git a/src/Storages/MergeTree/ExportList.cpp b/src/Storages/MergeTree/ExportList.cpp index bb3390828150..63ecc6d7c1f1 100644 --- a/src/Storages/MergeTree/ExportList.cpp +++ b/src/Storages/MergeTree/ExportList.cpp @@ -27,6 +27,11 @@ ExportsListElement::ExportsListElement( thread_group = ThreadGroup::createForBackgroundProcess(context); } +ExportsListElement::~ExportsListElement() +{ + background_memory_tracker.adjustOnBackgroundTaskEnd(&thread_group->memory_tracker); +} + ExportInfo ExportsListElement::getInfo() const { ExportInfo res; diff --git a/src/Storages/MergeTree/ExportList.h b/src/Storages/MergeTree/ExportList.h index f8de5a9eb1b0..ade18b69480c 100644 --- a/src/Storages/MergeTree/ExportList.h +++ b/src/Storages/MergeTree/ExportList.h @@ -65,6 +65,8 @@ struct ExportsListElement : private boost::noncopyable time_t create_time_, const ContextPtr & context); + ~ExportsListElement(); + ExportInfo getInfo() const; UInt64 getMemoryUsage() const; From 5376495c3832a8ee3a2cc346974b2dda1eede24e Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 25 Sep 2025 15:37:47 -0300 Subject: [PATCH 12/12] add missing increment --- src/Storages/MergeTree/MergeTreeData.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 0c1bc901c763..bdceeca0abdc 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6010,6 +6010,8 @@ void MergeTreeData::exportPartToTableImpl( ProfileEvents::increment(ProfileEvents::PartsExportDuplicated); } + ProfileEvents::increment(ProfileEvents::PartsExportFailures); + std::lock_guard inner_lock(export_manifests_mutex); export_manifests.erase(manifest); return;