From c69dcbcc19d7c3bc31bd2b805c70d7ced80254c3 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 28 Oct 2025 11:49:53 +0100 Subject: [PATCH 01/11] preserve parquet specific parallel formatting --- src/Storages/MergeTree/MergeTreeData.cpp | 5 ++++- src/Storages/MergeTree/MergeTreeExportManifest.h | 6 +++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index e9cf056b1135..a0148297be5a 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -213,6 +213,7 @@ namespace Setting extern const SettingsUInt64 min_bytes_to_use_direct_io; extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists; extern const SettingsBool output_format_parallel_formatting; + extern const SettingsBool output_format_parallel_formatting_parquet; } namespace MergeTreeSetting @@ -6244,7 +6245,8 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP dest_storage->getStorageID(), part, query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], - query_context->getSettingsRef()[Setting::output_format_parallel_formatting]); + query_context->getSettingsRef()[Setting::output_format_parallel_formatting], + query_context->getSettingsRef()[Setting::output_format_parallel_formatting_parquet]); std::lock_guard lock(export_manifests_mutex); @@ -6292,6 +6294,7 @@ void MergeTreeData::exportPartToTableImpl( { auto context_copy = Context::createCopy(local_context); context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); + context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parallel_formatting_parquet); sink = destination_storage->import( manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h index 36831fd132ba..cb1510f11a57 100644 --- a/src/Storages/MergeTree/MergeTreeExportManifest.h +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -13,17 +13,21 @@ struct MergeTreeExportManifest const StorageID & destination_storage_id_, const DataPartPtr & data_part_, bool overwrite_file_if_exists_, - bool parallel_formatting_) + bool parallel_formatting_, + bool parallel_formatting_parquet_) : destination_storage_id(destination_storage_id_), data_part(data_part_), overwrite_file_if_exists(overwrite_file_if_exists_), parallel_formatting(parallel_formatting_), + parallel_formatting_parquet(parallel_formatting_parquet_), create_time(time(nullptr)) {} StorageID destination_storage_id; DataPartPtr data_part; bool overwrite_file_if_exists; bool parallel_formatting; + /// parquet has a different setting for parallel formatting + bool parallel_formatting_parquet; time_t create_time; mutable bool in_progress = false; From 1b4135f1b6f091c7c946210dc82712954ffabf93 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 28 Oct 2025 12:21:36 +0100 Subject: [PATCH 02/11] nullopt format --- src/Storages/ObjectStorage/StorageObjectStorage.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 377a1f5e9beb..711ef86664f0 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -507,7 +508,7 @@ SinkToStoragePtr StorageObjectStorage::import( destination_file_path, object_storage, configuration, - format_settings, + std::nullopt, /// passing nullopt to force rebuild for format_settings based on query context std::make_shared(getInMemoryMetadataPtr()->getSampleBlock()), local_context); } From c5c00cd06f2d1ee0a759a5287fb9093baee7be9b Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 28 Oct 2025 12:51:54 +0100 Subject: [PATCH 03/11] tmp --- src/Storages/MergeTree/MergeTreeData.cpp | 11 +++-------- src/Storages/MergeTree/MergeTreeExportManifest.h | 14 +++----------- 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a0148297be5a..b59e0a29c621 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6244,9 +6244,7 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP MergeTreeExportManifest manifest( dest_storage->getStorageID(), part, - query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], - query_context->getSettingsRef()[Setting::output_format_parallel_formatting], - query_context->getSettingsRef()[Setting::output_format_parallel_formatting_parquet]); + query_context); std::lock_guard lock(export_manifests_mutex); @@ -6291,17 +6289,14 @@ void MergeTreeData::exportPartToTableImpl( std::string destination_file_path; try - { - auto context_copy = Context::createCopy(local_context); - context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); - context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parallel_formatting_parquet); + { sink = destination_storage->import( manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), block_with_partition_values, destination_file_path, manifest.overwrite_file_if_exists, - context_copy); + local_context); } catch (const Exception & e) { diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h index cb1510f11a57..c13a732ed749 100644 --- a/src/Storages/MergeTree/MergeTreeExportManifest.h +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -12,23 +12,15 @@ struct MergeTreeExportManifest MergeTreeExportManifest( const StorageID & destination_storage_id_, const DataPartPtr & data_part_, - bool overwrite_file_if_exists_, - bool parallel_formatting_, - bool parallel_formatting_parquet_) + ContextPtr context_) : destination_storage_id(destination_storage_id_), data_part(data_part_), - overwrite_file_if_exists(overwrite_file_if_exists_), - parallel_formatting(parallel_formatting_), - parallel_formatting_parquet(parallel_formatting_parquet_), + context(context_), create_time(time(nullptr)) {} StorageID destination_storage_id; DataPartPtr data_part; - bool overwrite_file_if_exists; - bool parallel_formatting; - /// parquet has a different setting for parallel formatting - bool parallel_formatting_parquet; - + ContextPtr context; time_t create_time; mutable bool in_progress = false; From 03e64f0419ca2054c72f34502f065bb1ed231770 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Tue, 28 Oct 2025 14:39:34 +0100 Subject: [PATCH 04/11] build fixes --- src/Storages/MergeTree/MergeTreeData.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index b59e0a29c621..c32ffb3202b2 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -212,8 +212,6 @@ namespace Setting extern const SettingsBool allow_experimental_export_merge_tree_part; extern const SettingsUInt64 min_bytes_to_use_direct_io; extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists; - extern const SettingsBool output_format_parallel_formatting; - extern const SettingsBool output_format_parallel_formatting_parquet; } namespace MergeTreeSetting @@ -6295,7 +6293,7 @@ void MergeTreeData::exportPartToTableImpl( manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), block_with_partition_values, destination_file_path, - manifest.overwrite_file_if_exists, + local_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], local_context); } catch (const Exception & e) From d58d9c8ed25972b667dc97c7272dce9334a27db2 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 29 Oct 2025 10:48:09 +0100 Subject: [PATCH 05/11] Revert "build fixes" This reverts commit 03e64f0419ca2054c72f34502f065bb1ed231770. --- src/Storages/MergeTree/MergeTreeData.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index c32ffb3202b2..b59e0a29c621 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -212,6 +212,8 @@ namespace Setting extern const SettingsBool allow_experimental_export_merge_tree_part; extern const SettingsUInt64 min_bytes_to_use_direct_io; extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists; + extern const SettingsBool output_format_parallel_formatting; + extern const SettingsBool output_format_parallel_formatting_parquet; } namespace MergeTreeSetting @@ -6293,7 +6295,7 @@ void MergeTreeData::exportPartToTableImpl( manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), block_with_partition_values, destination_file_path, - local_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], + manifest.overwrite_file_if_exists, local_context); } catch (const Exception & e) From 56aca7bbb1dea8577c42cf03f6bbecf356c64ba4 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 29 Oct 2025 10:48:20 +0100 Subject: [PATCH 06/11] Revert "tmp" This reverts commit c5c00cd06f2d1ee0a759a5287fb9093baee7be9b. --- src/Storages/MergeTree/MergeTreeData.cpp | 11 ++++++++--- src/Storages/MergeTree/MergeTreeExportManifest.h | 14 +++++++++++--- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index b59e0a29c621..a0148297be5a 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6244,7 +6244,9 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP MergeTreeExportManifest manifest( dest_storage->getStorageID(), part, - query_context); + query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], + query_context->getSettingsRef()[Setting::output_format_parallel_formatting], + query_context->getSettingsRef()[Setting::output_format_parallel_formatting_parquet]); std::lock_guard lock(export_manifests_mutex); @@ -6289,14 +6291,17 @@ void MergeTreeData::exportPartToTableImpl( std::string destination_file_path; try - { + { + auto context_copy = Context::createCopy(local_context); + context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); + context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parallel_formatting_parquet); sink = destination_storage->import( manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), block_with_partition_values, destination_file_path, manifest.overwrite_file_if_exists, - local_context); + context_copy); } catch (const Exception & e) { diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h index c13a732ed749..cb1510f11a57 100644 --- a/src/Storages/MergeTree/MergeTreeExportManifest.h +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -12,15 +12,23 @@ struct MergeTreeExportManifest MergeTreeExportManifest( const StorageID & destination_storage_id_, const DataPartPtr & data_part_, - ContextPtr context_) + bool overwrite_file_if_exists_, + bool parallel_formatting_, + bool parallel_formatting_parquet_) : destination_storage_id(destination_storage_id_), data_part(data_part_), - context(context_), + overwrite_file_if_exists(overwrite_file_if_exists_), + parallel_formatting(parallel_formatting_), + parallel_formatting_parquet(parallel_formatting_parquet_), create_time(time(nullptr)) {} StorageID destination_storage_id; DataPartPtr data_part; - ContextPtr context; + bool overwrite_file_if_exists; + bool parallel_formatting; + /// parquet has a different setting for parallel formatting + bool parallel_formatting_parquet; + time_t create_time; mutable bool in_progress = false; From 290613f1d2c41298057f146966aecd53afd589e4 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 29 Oct 2025 10:51:40 +0100 Subject: [PATCH 07/11] path logical fix --- src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h b/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h index a7e3b102e3e7..a66965984bb0 100644 --- a/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h +++ b/src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h @@ -55,7 +55,7 @@ namespace DB result += raw_path; - if (raw_path.back() != '/') + if (!raw_path.empty() && raw_path.back() != '/') { result += "/"; } From 846077530739a36db4bed60f26e95f731f75a72d Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 29 Oct 2025 10:53:29 +0100 Subject: [PATCH 08/11] typo --- src/Storages/MergeTree/MergeTreeData.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a0148297be5a..27408d9fb749 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -213,7 +213,7 @@ namespace Setting extern const SettingsUInt64 min_bytes_to_use_direct_io; extern const SettingsBool export_merge_tree_part_overwrite_file_if_exists; extern const SettingsBool output_format_parallel_formatting; - extern const SettingsBool output_format_parallel_formatting_parquet; + extern const SettingsBool output_format_parquet_parallel_encoding; } namespace MergeTreeSetting @@ -6246,7 +6246,7 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP part, query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], query_context->getSettingsRef()[Setting::output_format_parallel_formatting], - query_context->getSettingsRef()[Setting::output_format_parallel_formatting_parquet]); + query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding]); std::lock_guard lock(export_manifests_mutex); From 803a30905afa03b6fadb8ed65ac182a2fad92430 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 29 Oct 2025 10:58:02 +0100 Subject: [PATCH 09/11] save max threads as well --- src/Storages/MergeTree/MergeTreeData.cpp | 4 +++- src/Storages/MergeTree/MergeTreeExportManifest.h | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 27408d9fb749..ddceff7f4cb7 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6246,7 +6246,8 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP part, query_context->getSettingsRef()[Setting::export_merge_tree_part_overwrite_file_if_exists], query_context->getSettingsRef()[Setting::output_format_parallel_formatting], - query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding]); + query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding], + query_context->getSettingsRef()[Setting::max_threads]); std::lock_guard lock(export_manifests_mutex); @@ -6295,6 +6296,7 @@ void MergeTreeData::exportPartToTableImpl( auto context_copy = Context::createCopy(local_context); context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parallel_formatting_parquet); + context_copy->setSetting("max_threads", manifest.max_threads); sink = destination_storage->import( manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h index cb1510f11a57..6fee4a73c76f 100644 --- a/src/Storages/MergeTree/MergeTreeExportManifest.h +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -14,12 +14,14 @@ struct MergeTreeExportManifest const DataPartPtr & data_part_, bool overwrite_file_if_exists_, bool parallel_formatting_, - bool parallel_formatting_parquet_) + bool parallel_formatting_parquet_, + std::size_t max_threads_) : destination_storage_id(destination_storage_id_), data_part(data_part_), overwrite_file_if_exists(overwrite_file_if_exists_), parallel_formatting(parallel_formatting_), parallel_formatting_parquet(parallel_formatting_parquet_), + max_threads(max_threads_), create_time(time(nullptr)) {} StorageID destination_storage_id; @@ -28,6 +30,7 @@ struct MergeTreeExportManifest bool parallel_formatting; /// parquet has a different setting for parallel formatting bool parallel_formatting_parquet; + std::size_t max_threads; time_t create_time; mutable bool in_progress = false; From 5dcfac61dee8c265e7b98ed7f73a569ad09ef959 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 29 Oct 2025 11:04:39 +0100 Subject: [PATCH 10/11] another random fix --- src/Storages/MergeTree/MergeTreeSequentialSource.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index cc43895f0c76..56613657e68a 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -169,7 +169,8 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( addThrottler(read_settings.local_throttler, context->getMergesThrottler()); break; case Export: - read_settings.local_throttler = context->getExportsThrottler(); + addThrottler(read_settings.local_throttler, context->getExportsThrottler()); + addThrottler(read_settings.remote_throttler, context->getExportsThrottler()); break; } From 8238426b3b3fc4027f1560f784dfd22e259c8668 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 3 Nov 2025 20:15:35 +0100 Subject: [PATCH 11/11] rename property to match setting --- src/Storages/MergeTree/MergeTreeData.cpp | 2 +- src/Storages/MergeTree/MergeTreeExportManifest.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index ddceff7f4cb7..6e7dcb2cec6f 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6295,7 +6295,7 @@ void MergeTreeData::exportPartToTableImpl( { auto context_copy = Context::createCopy(local_context); context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting); - context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parallel_formatting_parquet); + context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding); context_copy->setSetting("max_threads", manifest.max_threads); sink = destination_storage->import( diff --git a/src/Storages/MergeTree/MergeTreeExportManifest.h b/src/Storages/MergeTree/MergeTreeExportManifest.h index 6fee4a73c76f..5e3d264f47eb 100644 --- a/src/Storages/MergeTree/MergeTreeExportManifest.h +++ b/src/Storages/MergeTree/MergeTreeExportManifest.h @@ -20,7 +20,7 @@ struct MergeTreeExportManifest data_part(data_part_), overwrite_file_if_exists(overwrite_file_if_exists_), parallel_formatting(parallel_formatting_), - parallel_formatting_parquet(parallel_formatting_parquet_), + parquet_parallel_encoding(parallel_formatting_parquet_), max_threads(max_threads_), create_time(time(nullptr)) {} @@ -29,7 +29,7 @@ struct MergeTreeExportManifest bool overwrite_file_if_exists; bool parallel_formatting; /// parquet has a different setting for parallel formatting - bool parallel_formatting_parquet; + bool parquet_parallel_encoding; std::size_t max_threads; time_t create_time;