diff --git a/docs/en/engines/table-engines/mergetree-family/part_export.md b/docs/en/engines/table-engines/mergetree-family/part_export.md index 0a580726b5dd..1ef6c437585c 100644 --- a/docs/en/engines/table-engines/mergetree-family/part_export.md +++ b/docs/en/engines/table-engines/mergetree-family/part_export.md @@ -2,7 +2,7 @@ ## Overview -The `ALTER TABLE EXPORT PART` command exports individual MergeTree data parts to object storage (S3, Azure Blob Storage, etc.), typically in Parquet format. +The `ALTER TABLE EXPORT PART` command exports individual MergeTree data parts to object storage (S3, Azure Blob Storage, etc.), typically in Parquet format. A commit file is shipped to the same destination directory containing all data files exported within that transaction. **Key Characteristics:** - **Experimental feature** - must be enabled via `allow_experimental_export_merge_tree_part` setting @@ -48,6 +48,18 @@ Source and destination tables must be 100% compatible: - **Default**: `false` - **Description**: If set to `true`, it will overwrite the file. Otherwise, fails with exception. +### `export_merge_tree_part_max_bytes_per_file` (Optional) + +- **Type**: `UInt64` +- **Default**: `0` +- **Description**: Maximum number of bytes to write to a single file when exporting a merge tree part. 0 means no limit. This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. Using this might break idempotency, use it with care. + +### `export_merge_tree_part_max_rows_per_file` (Optional) + +- **Type**: `UInt64` +- **Default**: `0` +- **Description**: Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit. This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. Using this might break idempotency, use it with care. + ## Examples ### Basic Export to S3 @@ -93,7 +105,7 @@ destination_database: default destination_table: destination_table create_time: 2025-11-19 09:09:11 part_name: 20251016-365_1_1_0 -destination_file_path: table_root/eventDate=2025-10-16/retention=365/20251016-365_1_1_0_17B2F6CD5D3C18E787C07AE3DAF16EB1.parquet +destination_file_paths: ['table_root/eventDate=2025-10-16/retention=365/20251016-365_1_1_0_17B2F6CD5D3C18E787C07AE3DAF16EB1.1.parquet'] elapsed: 2.04845441 rows_read: 1138688 -- 1.14 million total_rows_to_read: 550961374 -- 550.96 million @@ -138,7 +150,8 @@ partition_id: 2021 partition: 2021 part_type: Compact disk_name: default -path_on_disk: year=2021/2021_0_0_0_78C704B133D41CB0EF64DD2A9ED3B6BA.parquet +path_on_disk: +remote_file_paths ['year=2021/2021_0_0_0_78C704B133D41CB0EF64DD2A9ED3B6BA.1.parquet'] rows: 1 size_in_bytes: 272 merged_from: ['2021_0_0_0'] @@ -158,3 +171,99 @@ ProfileEvents: {} - `PartsExportDuplicated` - Number of part exports that failed because target already exists. - `PartsExportTotalMilliseconds` - Total time +### Split large files + +```sql +alter table big_table export part '2025_0_32_3' to table replicated_big_destination SETTINGS export_merge_tree_part_max_bytes_per_file=10000000, output_format_parquet_row_group_size_bytes=5000000; + +arthur :) select * from system.exports; + +SELECT * +FROM system.exports + +Query id: d78d9ce5-cfbc-4957-b7dd-bc8129811634 + +Row 1: +────── +source_database: default +source_table: big_table +destination_database: default +destination_table: replicated_big_destination +create_time: 2025-12-15 13:12:48 +part_name: 2025_0_32_3 +destination_file_paths: ['replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.1.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.2.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.3.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.4.parquet'] +elapsed: 14.360427274 +rows_read: 10256384 -- 10.26 million +total_rows_to_read: 10485760 -- 10.49 million +total_size_bytes_compressed: 83779395 -- 83.78 million +total_size_bytes_uncompressed: 10611691600 -- 10.61 billion +bytes_read_uncompressed: 10440998912 -- 10.44 billion +memory_usage: 89795477 -- 89.80 million +peak_memory_usage: 107362133 -- 107.36 million + +1 row in set. Elapsed: 0.014 sec. + +arthur :) select * from system.part_log where event_type = 'ExportPart' order by event_time desc limit 1 format Vertical; + +SELECT * +FROM system.part_log +WHERE event_type = 'ExportPart' +ORDER BY event_time DESC +LIMIT 1 +FORMAT Vertical + +Query id: 95128b01-b751-4726-8e3e-320728ac6af7 + +Row 1: +────── +hostname: arthur +query_id: +event_type: ExportPart +merge_reason: NotAMerge +merge_algorithm: Undecided +event_date: 2025-12-15 +event_time: 2025-12-15 13:13:03 +event_time_microseconds: 2025-12-15 13:13:03.197492 +duration_ms: 14673 +database: default +table: big_table +table_uuid: a3eeeea0-295c-41a3-84ef-6b5463dbbe8c +part_name: 2025_0_32_3 +partition_id: 2025 +partition: 2025 +part_type: Wide +disk_name: default +path_on_disk: ./store/a3e/a3eeeea0-295c-41a3-84ef-6b5463dbbe8c/2025_0_32_3/ +remote_file_paths: ['replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.1.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.2.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.3.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.4.parquet'] +rows: 10485760 -- 10.49 million +size_in_bytes: 83779395 -- 83.78 million +merged_from: ['2025_0_32_3'] +bytes_uncompressed: 10611691600 -- 10.61 billion +read_rows: 10485760 -- 10.49 million +read_bytes: 10674503680 -- 10.67 billion +peak_memory_usage: 107362133 -- 107.36 million +error: 0 +exception: +ProfileEvents: {} + +1 row in set. Elapsed: 0.044 sec. + +arthur :) select _path, formatReadableSize(_size) as _size from s3(s3_conn, filename='**', format=One); + +SELECT + _path, + formatReadableSize(_size) AS _size +FROM s3(s3_conn, filename = '**', format = One) + +Query id: c48ae709-f590-4d1b-8158-191f8d628966 + + ┌─_path────────────────────────────────────────────────────────────────────────────────┬─_size─────┐ +1. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.1.parquet │ 17.36 MiB │ +2. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.2.parquet │ 17.32 MiB │ +3. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.4.parquet │ 5.04 MiB │ +4. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.3.parquet │ 17.40 MiB │ +5. │ test/replicated_big/year=2025/commit_2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7 │ 320.00 B │ + └──────────────────────────────────────────────────────────────────────────────────────┴───────────┘ + +5 rows in set. Elapsed: 0.072 sec. +``` diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index f8512a6e6c2d..2f0f02f63491 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6899,6 +6899,14 @@ Possible values: - `` (empty value) - use session timezone Default value is `UTC`. +)", 0) \ + DECLARE(UInt64, export_merge_tree_part_max_bytes_per_file, 0, R"( +Maximum number of bytes to write to a single file when exporting a merge tree part. 0 means no limit. +This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. +)", 0) \ + DECLARE(UInt64, export_merge_tree_part_max_rows_per_file, 0, R"( +Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit. +This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. )", 0) \ \ /* ####################################################### */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 3e3f5e3f7608..09bdf3a3bcb7 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -55,7 +55,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."}, {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}, {"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."}, - {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."} + {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}, + {"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."}, + {"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Interpreters/PartLog.cpp b/src/Interpreters/PartLog.cpp index aca3b4cf6870..95596cd1dc08 100644 --- a/src/Interpreters/PartLog.cpp +++ b/src/Interpreters/PartLog.cpp @@ -134,6 +134,7 @@ ColumnsDescription PartLogElement::getColumnsDescription() {"part_type", std::make_shared(), "The type of the part. Possible values: Wide and Compact."}, {"disk_name", std::make_shared(), "The disk name data part lies on."}, {"path_on_disk", std::make_shared(), "Absolute path to the folder with data part files."}, + {"remote_file_paths", std::make_shared(std::make_shared()), "In case of an export operation to remote storages, the file paths a given export generated"}, {"rows", std::make_shared(), "The number of rows in the data part."}, {"size_in_bytes", std::make_shared(), "Size of the data part on disk in bytes."}, @@ -187,6 +188,12 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const columns[i++]->insert(disk_name); columns[i++]->insert(path_on_disk); + Array remote_file_paths_array; + remote_file_paths_array.reserve(remote_file_paths.size()); + for (const auto & remote_file_path : remote_file_paths) + remote_file_paths_array.push_back(remote_file_path); + columns[i++]->insert(remote_file_paths_array); + columns[i++]->insert(rows); columns[i++]->insert(bytes_compressed_on_disk); diff --git a/src/Interpreters/PartLog.h b/src/Interpreters/PartLog.h index 4f58069dae55..a2571448720f 100644 --- a/src/Interpreters/PartLog.h +++ b/src/Interpreters/PartLog.h @@ -71,6 +71,7 @@ struct PartLogElement String partition; String disk_name; String path_on_disk; + std::vector remote_file_paths; MergeTreeDataPartType part_type; diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index 81f61b5b9f12..6d6d56222c19 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -62,14 +62,14 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry struct ExportReplicatedMergeTreePartitionProcessedPartEntry { String part_name; - String path_in_destination; + std::vector paths_in_destination; String finished_by; std::string toJsonString() const { Poco::JSON::Object json; json.set("part_name", part_name); - json.set("path_in_destination", path_in_destination); + json.set("paths_in_destination", paths_in_destination); json.set("finished_by", finished_by); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); @@ -86,7 +86,11 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry ExportReplicatedMergeTreePartitionProcessedPartEntry entry; entry.part_name = json->getValue("part_name"); - entry.path_in_destination = json->getValue("path_in_destination"); + + const auto paths_in_destination_array = json->getArray("paths_in_destination"); + for (size_t i = 0; i < paths_in_destination_array->size(); ++i) + entry.paths_in_destination.emplace_back(paths_in_destination_array->getElement(static_cast(i))); + entry.finished_by = json->getValue("finished_by"); return entry; @@ -108,6 +112,8 @@ struct ExportReplicatedMergeTreePartitionManifest size_t max_threads; bool parallel_formatting; bool parquet_parallel_encoding; + size_t max_bytes_per_file; + size_t max_rows_per_file; MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy; std::string toJsonString() const @@ -127,6 +133,8 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("parallel_formatting", parallel_formatting); json.set("max_threads", max_threads); json.set("parquet_parallel_encoding", parquet_parallel_encoding); + json.set("max_bytes_per_file", max_bytes_per_file); + json.set("max_rows_per_file", max_rows_per_file); json.set("file_already_exists_policy", String(magic_enum::enum_name(file_already_exists_policy))); json.set("create_time", create_time); json.set("max_retries", max_retries); @@ -160,7 +168,8 @@ struct ExportReplicatedMergeTreePartitionManifest manifest.max_threads = json->getValue("max_threads"); manifest.parallel_formatting = json->getValue("parallel_formatting"); manifest.parquet_parallel_encoding = json->getValue("parquet_parallel_encoding"); - + manifest.max_bytes_per_file = json->getValue("max_bytes_per_file"); + manifest.max_rows_per_file = json->getValue("max_rows_per_file"); if (json->has("file_already_exists_policy")) { const auto file_already_exists_policy = magic_enum::enum_cast(json->getValue("file_already_exists_policy")); diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 15938d9f3c22..5d23fec9c2ac 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -466,8 +466,10 @@ It is currently only implemented in StorageObjectStorage. virtual SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - std::string & /* destination_file_path */, + const std::function & /* new_file_path_callback */, bool /* overwrite_if_exists */, + std::size_t /* max_bytes_per_file */, + std::size_t /* max_rows_per_file */, const std::optional & /* format_settings */, ContextPtr /* context */) { diff --git a/src/Storages/MergeTree/ExportList.cpp b/src/Storages/MergeTree/ExportList.cpp index 0239f841dc69..3ee75fde831e 100644 --- a/src/Storages/MergeTree/ExportList.cpp +++ b/src/Storages/MergeTree/ExportList.cpp @@ -8,7 +8,7 @@ ExportsListElement::ExportsListElement( const StorageID & destination_table_id_, UInt64 part_size_, const String & part_name_, - const String & target_file_name_, + const std::vector & destination_file_paths_, UInt64 total_rows_to_read_, UInt64 total_size_bytes_compressed_, UInt64 total_size_bytes_uncompressed_, @@ -18,7 +18,7 @@ ExportsListElement::ExportsListElement( , destination_table_id(destination_table_id_) , part_size(part_size_) , part_name(part_name_) -, destination_file_path(target_file_name_) +, destination_file_paths(destination_file_paths_) , total_rows_to_read(total_rows_to_read_) , total_size_bytes_compressed(total_size_bytes_compressed_) , total_size_bytes_uncompressed(total_size_bytes_uncompressed_) @@ -40,16 +40,21 @@ ExportInfo ExportsListElement::getInfo() const res.destination_database = destination_table_id.database_name; res.destination_table = destination_table_id.table_name; res.part_name = part_name; - res.destination_file_path = destination_file_path; - res.rows_read = rows_read; + + { + std::shared_lock lock(destination_file_paths_mutex); + res.destination_file_paths = destination_file_paths; + } + + res.rows_read = rows_read.load(std::memory_order_relaxed); res.total_rows_to_read = total_rows_to_read; res.total_size_bytes_compressed = total_size_bytes_compressed; res.total_size_bytes_uncompressed = total_size_bytes_uncompressed; - res.bytes_read_uncompressed = bytes_read_uncompressed; + res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed); res.memory_usage = getMemoryUsage(); res.peak_memory_usage = getPeakMemoryUsage(); res.create_time = create_time; - res.elapsed = elapsed; + res.elapsed = watch.elapsedSeconds(); return res; } diff --git a/src/Storages/MergeTree/ExportList.h b/src/Storages/MergeTree/ExportList.h index 3c4daa07737b..d799c68cd21c 100644 --- a/src/Storages/MergeTree/ExportList.h +++ b/src/Storages/MergeTree/ExportList.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace CurrentMetrics { @@ -23,7 +24,7 @@ struct ExportInfo String destination_database; String destination_table; String part_name; - String destination_file_path; + std::vector destination_file_paths; UInt64 rows_read; UInt64 total_rows_to_read; UInt64 total_size_bytes_compressed; @@ -41,24 +42,26 @@ struct ExportsListElement : private boost::noncopyable const StorageID destination_table_id; const UInt64 part_size; const String part_name; - String destination_file_path; - UInt64 rows_read {0}; + + /// see destination_file_paths_mutex + std::vector destination_file_paths; + std::atomic rows_read {0}; UInt64 total_rows_to_read {0}; UInt64 total_size_bytes_compressed {0}; UInt64 total_size_bytes_uncompressed {0}; - UInt64 bytes_read_uncompressed {0}; + std::atomic bytes_read_uncompressed {0}; time_t create_time {0}; - Float64 elapsed {0}; Stopwatch watch; ThreadGroupPtr thread_group; + mutable std::shared_mutex destination_file_paths_mutex; ExportsListElement( const StorageID & source_table_id_, const StorageID & destination_table_id_, UInt64 part_size_, const String & part_name_, - const String & destination_file_path_, + const std::vector & destination_file_paths_, UInt64 total_rows_to_read_, UInt64 total_size_bytes_compressed_, UInt64 total_size_bytes_uncompressed_, diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index a43c45d0edaf..ad737fedcb21 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include namespace ProfileEvents { @@ -36,17 +38,24 @@ namespace ErrorCodes namespace Setting { extern const SettingsUInt64 min_bytes_to_use_direct_io; + extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file; + extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; } -ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_, ContextPtr context_) +ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExportManifest & manifest_) : storage(storage_), - manifest(manifest_), - local_context(context_) + manifest(manifest_) { } bool ExportPartTask::executeStep() { + auto local_context = Context::createCopy(storage.getContext()); + local_context->makeQueryContextForExportPart(); + local_context->setCurrentQueryId(manifest.transaction_id); + local_context->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART); + local_context->setSettings(manifest.settings); + const auto & metadata_snapshot = manifest.metadata_snapshot; Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); @@ -75,7 +84,7 @@ bool ExportPartTask::executeStep() manifest.destination_storage_id, manifest.data_part->getBytesOnDisk(), manifest.data_part->name, - "not_computed_yet", + std::vector{}, manifest.data_part->rows_count, manifest.data_part->getBytesOnDisk(), manifest.data_part->getBytesUncompressedOnDisk(), @@ -84,115 +93,75 @@ bool ExportPartTask::executeStep() SinkToStoragePtr sink; + const auto new_file_path_callback = [&exports_list_entry](const std::string & file_path) + { + std::unique_lock lock((*exports_list_entry)->destination_file_paths_mutex); + (*exports_list_entry)->destination_file_paths.push_back(file_path); + }; + try { sink = destination_storage->import( manifest.data_part->name + "_" + manifest.data_part->checksums.getTotalChecksumHex(), block_with_partition_values, - (*exports_list_entry)->destination_file_path, + new_file_path_callback, manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite, - manifest.format_settings, + manifest.settings[Setting::export_merge_tree_part_max_bytes_per_file], + manifest.settings[Setting::export_merge_tree_part_max_rows_per_file], + getFormatSettings(local_context), local_context); - } - catch (const Exception & e) - { - if (e.code() == ErrorCodes::FILE_ALREADY_EXISTS) - { - ProfileEvents::increment(ProfileEvents::PartsExportDuplicated); - - /// File already exists and the policy is NO_OP, treat it as success. - if (manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::skip) - { - storage.writePartLog( - PartLogElement::Type::EXPORT_PART, - {}, - static_cast((*exports_list_entry)->elapsed * 1000000000), - manifest.data_part->name, - manifest.data_part, - {manifest.data_part}, - nullptr, - nullptr, - exports_list_entry.get()); - - std::lock_guard inner_lock(storage.export_manifests_mutex); - storage.export_manifests.erase(manifest); - ProfileEvents::increment(ProfileEvents::PartsExports); - ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast((*exports_list_entry)->elapsed * 1000)); - - if (manifest.completion_callback) - manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess((*exports_list_entry)->destination_file_path)); - return false; - } - } + bool apply_deleted_mask = true; + bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk(); + bool prefetch = false; - tryLogCurrentException(__PRETTY_FUNCTION__); - - ProfileEvents::increment(ProfileEvents::PartsExportFailures); + MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params + { + .metadata_version = metadata_snapshot->getMetadataVersion(), + .min_part_metadata_version = manifest.data_part->getMetadataVersion() + }; - std::lock_guard inner_lock(storage.export_manifests_mutex); - storage.export_manifests.erase(manifest); + auto mutations_snapshot = storage.getMutationsSnapshot(mutations_snapshot_params); + auto alter_conversions = MergeTreeData::getAlterConversionsForPart( + manifest.data_part, + mutations_snapshot, + local_context); - if (manifest.completion_callback) - manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createFailure(e)); - return false; - } + QueryPlan plan_for_part; - bool apply_deleted_mask = true; - bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk(); - bool prefetch = false; + createReadFromPartStep( + read_type, + plan_for_part, + storage, + storage.getStorageSnapshot(metadata_snapshot, local_context), + RangesInDataPart(manifest.data_part), + alter_conversions, + nullptr, + columns_to_read, + nullptr, + apply_deleted_mask, + std::nullopt, + read_with_direct_io, + prefetch, + local_context, + getLogger("ExportPartition")); - MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params - { - .metadata_version = metadata_snapshot->getMetadataVersion(), - .min_part_metadata_version = manifest.data_part->getMetadataVersion() - }; + ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ""); - auto mutations_snapshot = storage.getMutationsSnapshot(mutations_snapshot_params); - auto alter_conversions = MergeTreeData::getAlterConversionsForPart( - manifest.data_part, - mutations_snapshot, - local_context); + QueryPlanOptimizationSettings optimization_settings(local_context); + auto pipeline_settings = BuildQueryPipelineSettings(local_context); + auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); - QueryPlan plan_for_part; - - createReadFromPartStep( - read_type, - plan_for_part, - storage, - storage.getStorageSnapshot(metadata_snapshot, local_context), - RangesInDataPart(manifest.data_part), - alter_conversions, - nullptr, - columns_to_read, - nullptr, - apply_deleted_mask, - std::nullopt, - read_with_direct_io, - prefetch, - local_context, - getLogger("ExportPartition")); - - - ThreadGroupSwitcher switcher((*exports_list_entry)->thread_group, ""); - - QueryPlanOptimizationSettings optimization_settings(local_context); - auto pipeline_settings = BuildQueryPipelineSettings(local_context); - auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings); - - builder->setProgressCallback([&exports_list_entry](const Progress & progress) - { - (*exports_list_entry)->bytes_read_uncompressed += progress.read_bytes; - (*exports_list_entry)->rows_read += progress.read_rows; - (*exports_list_entry)->elapsed = (*exports_list_entry)->watch.elapsedSeconds(); - }); + builder->setProgressCallback([&exports_list_entry](const Progress & progress) + { + (*exports_list_entry)->bytes_read_uncompressed += progress.read_bytes; + (*exports_list_entry)->rows_read += progress.read_rows; + }); - pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); - pipeline.complete(sink); + pipeline.complete(sink); - try - { CompletedPipelineExecutor exec(pipeline); auto is_cancelled_callback = [this]() @@ -213,7 +182,7 @@ bool ExportPartTask::executeStep() storage.writePartLog( PartLogElement::Type::EXPORT_PART, {}, - static_cast((*exports_list_entry)->elapsed * 1000000000), + (*exports_list_entry)->watch.elapsed(), manifest.data_part->name, manifest.data_part, {manifest.data_part}, @@ -224,22 +193,52 @@ bool ExportPartTask::executeStep() storage.export_manifests.erase(manifest); ProfileEvents::increment(ProfileEvents::PartsExports); - ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, static_cast((*exports_list_entry)->elapsed * 1000)); + ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, (*exports_list_entry)->watch.elapsedMilliseconds()); if (manifest.completion_callback) - manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess((*exports_list_entry)->destination_file_path)); + manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess((*exports_list_entry)->destination_file_paths)); } catch (const Exception & e) { - tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("while exporting the part {}. User should retry.", manifest.data_part->name)); + if (e.code() == ErrorCodes::FILE_ALREADY_EXISTS) + { + ProfileEvents::increment(ProfileEvents::PartsExportDuplicated); + + /// File already exists and the policy is NO_OP, treat it as success. + if (manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::skip) + { + storage.writePartLog( + PartLogElement::Type::EXPORT_PART, + {}, + (*exports_list_entry)->watch.elapsed(), + manifest.data_part->name, + manifest.data_part, + {manifest.data_part}, + nullptr, + nullptr, + exports_list_entry.get()); + + std::lock_guard inner_lock(storage.export_manifests_mutex); + storage.export_manifests.erase(manifest); + + ProfileEvents::increment(ProfileEvents::PartsExports); + ProfileEvents::increment(ProfileEvents::PartsExportTotalMilliseconds, (*exports_list_entry)->watch.elapsedMilliseconds()); + + if (manifest.completion_callback) + { + manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createSuccess((*exports_list_entry)->destination_file_paths)); + } + + return false; + } + } ProfileEvents::increment(ProfileEvents::PartsExportFailures); - std::lock_guard inner_lock(storage.export_manifests_mutex); storage.writePartLog( PartLogElement::Type::EXPORT_PART, ExecutionStatus::fromCurrentException("", true), - static_cast((*exports_list_entry)->elapsed * 1000000000), + (*exports_list_entry)->watch.elapsed(), manifest.data_part->name, manifest.data_part, {manifest.data_part}, @@ -247,13 +246,14 @@ bool ExportPartTask::executeStep() nullptr, exports_list_entry.get()); + std::lock_guard inner_lock(storage.export_manifests_mutex); storage.export_manifests.erase(manifest); if (manifest.completion_callback) manifest.completion_callback(MergeTreePartExportManifest::CompletionCallbackResult::createFailure(e)); - - throw; + return false; } + return false; } diff --git a/src/Storages/MergeTree/ExportPartTask.h b/src/Storages/MergeTree/ExportPartTask.h index bcec68b2b737..9c8fa6cc01dd 100644 --- a/src/Storages/MergeTree/ExportPartTask.h +++ b/src/Storages/MergeTree/ExportPartTask.h @@ -12,8 +12,7 @@ class ExportPartTask : public IExecutableTask public: explicit ExportPartTask( MergeTreeData & storage_, - const MergeTreePartExportManifest & manifest_, - ContextPtr context_); + const MergeTreePartExportManifest & manifest_); bool executeStep() override; void onCompleted() override; StorageID getStorageID() const override; @@ -25,7 +24,6 @@ class ExportPartTask : public IExecutableTask private: MergeTreeData & storage; MergeTreePartExportManifest manifest; - ContextPtr local_context; QueryPipeline pipeline; std::atomic cancel_requested = false; diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index ab3a8ce361c7..a71cf6ae0e45 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -28,6 +28,8 @@ namespace context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding); context_copy->setSetting("max_threads", manifest.max_threads); context_copy->setSetting("export_merge_tree_part_file_already_exists_policy", String(magic_enum::enum_name(manifest.file_already_exists_policy))); + context_copy->setSetting("export_merge_tree_part_max_bytes_per_file", manifest.max_bytes_per_file); + context_copy->setSetting("export_merge_tree_part_max_rows_per_file", manifest.max_rows_per_file); return context_copy; } } @@ -175,7 +177,7 @@ void ExportPartitionTaskScheduler::handlePartExportCompletion( if (result.success) { - handlePartExportSuccess(manifest, destination_storage, processing_parts_path, processed_part_path, part_name, export_path, zk, result.relative_path_in_destination_storage); + handlePartExportSuccess(manifest, destination_storage, processing_parts_path, processed_part_path, part_name, export_path, zk, result.relative_paths_in_destination_storage); } else { @@ -191,12 +193,17 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess( const std::string & part_name, const std::filesystem::path & export_path, const zkutil::ZooKeeperPtr & zk, - const String & relative_path_in_destination_storage + const std::vector & relative_paths_in_destination_storage ) { - LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} exported successfully", relative_path_in_destination_storage); + LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} exported successfully, paths size: {}", part_name, relative_paths_in_destination_storage.size()); - if (!tryToMovePartToProcessed(export_path, processing_parts_path, processed_part_path, part_name, relative_path_in_destination_storage, zk)) + for (const auto & relative_path_in_destination_storage : relative_paths_in_destination_storage) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: {}", relative_path_in_destination_storage); + } + + if (!tryToMovePartToProcessed(export_path, processing_parts_path, processed_part_path, part_name, relative_paths_in_destination_storage, zk)) { LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to move part to processed, will not commit export partition"); return; @@ -323,7 +330,7 @@ bool ExportPartitionTaskScheduler::tryToMovePartToProcessed( const std::filesystem::path & processing_parts_path, const std::filesystem::path & processed_part_path, const std::string & part_name, - const String & relative_path_in_destination_storage, + const std::vector & relative_paths_in_destination_storage, const zkutil::ZooKeeperPtr & zk ) { @@ -348,7 +355,7 @@ bool ExportPartitionTaskScheduler::tryToMovePartToProcessed( ExportReplicatedMergeTreePartitionProcessedPartEntry processed_part_entry; processed_part_entry.part_name = part_name; - processed_part_entry.path_in_destination = relative_path_in_destination_storage; + processed_part_entry.paths_in_destination = relative_paths_in_destination_storage; processed_part_entry.finished_by = storage.replica_name; requests.emplace_back(zkutil::makeRemoveRequest(processing_parts_path / part_name, -1)); diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.h b/src/Storages/MergeTree/ExportPartitionTaskScheduler.h index 0045019a4ec7..29a41fde1cb9 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.h +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.h @@ -37,7 +37,7 @@ class ExportPartitionTaskScheduler const std::string & part_name, const std::filesystem::path & export_path, const zkutil::ZooKeeperPtr & zk, - const String & relative_path_in_destination_storage + const std::vector & relative_paths_in_destination_storage ); void handlePartExportFailure( @@ -53,7 +53,7 @@ class ExportPartitionTaskScheduler const std::filesystem::path & processing_parts_path, const std::filesystem::path & processed_part_path, const std::string & part_name, - const String & relative_path_in_destination_storage, + const std::vector & relative_paths_in_destination_storage, const zkutil::ZooKeeperPtr & zk ); diff --git a/src/Storages/MergeTree/ExportPartitionUtils.cpp b/src/Storages/MergeTree/ExportPartitionUtils.cpp index 466eb79e8367..51ed72b7f6f6 100644 --- a/src/Storages/MergeTree/ExportPartitionUtils.cpp +++ b/src/Storages/MergeTree/ExportPartitionUtils.cpp @@ -55,7 +55,10 @@ namespace ExportPartitionUtils const auto processed_part_entry = ExportReplicatedMergeTreePartitionProcessedPartEntry::fromJsonString(responses[i].data); - exported_paths.emplace_back(processed_part_entry.path_in_destination); + for (const auto & path_in_destination : processed_part_entry.paths_in_destination) + { + exported_paths.emplace_back(path_in_destination); + } } return exported_paths; @@ -71,13 +74,19 @@ namespace ExportPartitionUtils { const auto exported_paths = ExportPartitionUtils::getExportedPaths(log, zk, entry_path); - if (exported_paths.size() != manifest.parts.size()) + if (exported_paths.empty()) + { + LOG_WARNING(log, "ExportPartition: No exported paths found, will not commit export. This might be a bug"); + return; + } + + //// not checking for an exact match because a single part might generate multiple files + if (exported_paths.size() < manifest.parts.size()) { - LOG_INFO(log, "ExportPartition: Skipping {}: exported paths size does not match parts size, this is a BUG", entry_path); + LOG_WARNING(log, "ExportPartition: Reached the commit phase, but exported paths size is less than the number of parts, will not commit export. This might be a bug"); return; } - LOG_INFO(log, "ExportPartition: Exported paths size matches parts size, commit the export"); destination_storage->commitExportPartitionTransaction(manifest.transaction_id, manifest.partition_id, exported_paths, context); LOG_INFO(log, "ExportPartition: Committed export, mark as completed"); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 260508764f6d..e90a6e3ffc0b 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6261,7 +6261,7 @@ void MergeTreeData::exportPartToTable( part, transaction_id, query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, - format_settings, + query_context->getSettingsCopy(), source_metadata_ptr, completion_callback); @@ -9066,7 +9066,9 @@ try part_log_elem.rows_read = (*exports_entry)->rows_read; part_log_elem.bytes_read_uncompressed = (*exports_entry)->bytes_read_uncompressed; part_log_elem.peak_memory_usage = (*exports_entry)->getPeakMemoryUsage(); - part_log_elem.path_on_disk = (*exports_entry)->destination_file_path; + + /// no need to lock because at this point no one is writing to the destination file paths + part_log_elem.remote_file_paths = (*exports_entry)->destination_file_paths; } if (profile_counters) @@ -9134,12 +9136,7 @@ bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee) continue; } - auto context_copy = Context::createCopy(getContext()); - context_copy->makeQueryContextForExportPart(); - context_copy->setCurrentQueryId(manifest.transaction_id); - context_copy->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART); - - auto task = std::make_shared(*this, manifest, context_copy); + auto task = std::make_shared(*this, manifest); manifest.in_progress = assignee.scheduleMoveTask(task); diff --git a/src/Storages/MergeTree/MergeTreePartExportManifest.h b/src/Storages/MergeTree/MergeTreePartExportManifest.h index 533eeb6decdd..db6626d22e0a 100644 --- a/src/Storages/MergeTree/MergeTreePartExportManifest.h +++ b/src/Storages/MergeTree/MergeTreePartExportManifest.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB { @@ -22,22 +23,22 @@ struct MergeTreePartExportManifest struct CompletionCallbackResult { private: - CompletionCallbackResult(bool success_, const String & relative_path_in_destination_storage_, std::optional exception_) - : success(success_), relative_path_in_destination_storage(relative_path_in_destination_storage_), exception(std::move(exception_)) {} + CompletionCallbackResult(bool success_, const std::vector & relative_paths_in_destination_storage_, std::optional exception_) + : success(success_), relative_paths_in_destination_storage(relative_paths_in_destination_storage_), exception(std::move(exception_)) {} public: - static CompletionCallbackResult createSuccess(const String & relative_path_in_destination_storage_) + static CompletionCallbackResult createSuccess(const std::vector & relative_paths_in_destination_storage_) { - return CompletionCallbackResult(true, relative_path_in_destination_storage_, std::nullopt); + return CompletionCallbackResult(true, relative_paths_in_destination_storage_, std::nullopt); } static CompletionCallbackResult createFailure(Exception exception_) { - return CompletionCallbackResult(false, "", std::move(exception_)); + return CompletionCallbackResult(false, {}, std::move(exception_)); } bool success = false; - String relative_path_in_destination_storage; + std::vector relative_paths_in_destination_storage; std::optional exception; }; @@ -46,14 +47,14 @@ struct MergeTreePartExportManifest const DataPartPtr & data_part_, const String & transaction_id_, FileAlreadyExistsPolicy file_already_exists_policy_, - const FormatSettings & format_settings_, + const Settings & settings_, const StorageMetadataPtr & metadata_snapshot_, std::function completion_callback_ = {}) : destination_storage_id(destination_storage_id_), data_part(data_part_), transaction_id(transaction_id_), file_already_exists_policy(file_already_exists_policy_), - format_settings(format_settings_), + settings(settings_), metadata_snapshot(metadata_snapshot_), completion_callback(completion_callback_), create_time(time(nullptr)) {} @@ -63,7 +64,7 @@ struct MergeTreePartExportManifest /// Used for killing the export. String transaction_id; FileAlreadyExistsPolicy file_already_exists_policy; - FormatSettings format_settings; + Settings settings; /// Metadata snapshot captured at the time of query validation to prevent race conditions with mutations /// Otherwise the export could fail if the schema changes between validation and execution diff --git a/src/Storages/ObjectStorage/MultiFileStorageObjectStorageSink.cpp b/src/Storages/ObjectStorage/MultiFileStorageObjectStorageSink.cpp new file mode 100644 index 000000000000..815e1c804eac --- /dev/null +++ b/src/Storages/ObjectStorage/MultiFileStorageObjectStorageSink.cpp @@ -0,0 +1,146 @@ +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int FILE_ALREADY_EXISTS; +} + +MultiFileStorageObjectStorageSink::MultiFileStorageObjectStorageSink( + const std::string & base_path_, + const String & transaction_id_, + ObjectStoragePtr object_storage_, + StorageObjectStorageConfigurationPtr configuration_, + std::size_t max_bytes_per_file_, + std::size_t max_rows_per_file_, + bool overwrite_if_exists_, + const std::function & new_file_path_callback_, + const std::optional & format_settings_, + SharedHeader sample_block_, + ContextPtr context_) + : SinkToStorage(sample_block_), + base_path(base_path_), + transaction_id(transaction_id_), + object_storage(object_storage_), + configuration(configuration_), + max_bytes_per_file(max_bytes_per_file_), + max_rows_per_file(max_rows_per_file_), + overwrite_if_exists(overwrite_if_exists_), + new_file_path_callback(new_file_path_callback_), + format_settings(format_settings_), + sample_block(sample_block_), + context(context_) +{ + current_sink = createNewSink(); +} + +MultiFileStorageObjectStorageSink::~MultiFileStorageObjectStorageSink() +{ + if (isCancelled()) + current_sink->cancel(); +} + +/// Adds a counter that represents file index to the file path. +/// Example: +/// Input is `table_root/year=2025/month=12/day=12/file.parquet` +/// Output is `table_root/year=2025/month=12/day=12/file.1.parquet` +std::string MultiFileStorageObjectStorageSink::generateNewFilePath() +{ + const auto file_format = Poco::toLower(configuration->getFormat()); + const auto index_string = std::to_string(file_paths.size() + 1); + std::size_t pos = base_path.rfind(file_format); + + /// normal case - path ends with the file format + if (pos != std::string::npos) + { + const auto path_without_extension = base_path.substr(0, pos); + const auto file_format_extension = "." + file_format; + + return path_without_extension + index_string + file_format_extension; + } + + /// if no extension is found, just append the index - I am not even sure this is possible + return base_path + "." + index_string; +} + +std::shared_ptr MultiFileStorageObjectStorageSink::createNewSink() +{ + auto new_path = generateNewFilePath(); + + /// todo + /// sounds like bad design, but callers might decide to ignore the exception, and if we throw it before the callback + /// they will not be able to grab the file path. + /// maybe I should consider moving the file already exists policy in here? + new_file_path_callback(new_path); + + file_paths.emplace_back(std::move(new_path)); + + if (!overwrite_if_exists && object_storage->exists(StoredObject(file_paths.back()))) + { + throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "File {} already exists", file_paths.back()); + } + + return std::make_shared(file_paths.back(), object_storage, configuration, format_settings, sample_block, context); +} + +void MultiFileStorageObjectStorageSink::consume(Chunk & chunk) +{ + if (isCancelled()) + { + current_sink->cancel(); + return; + } + + const auto written_bytes = current_sink->getWrittenBytes(); + + const bool exceeded_bytes_limit = max_bytes_per_file && written_bytes >= max_bytes_per_file; + const bool exceeded_rows_limit = max_rows_per_file && current_sink_written_rows >= max_rows_per_file; + + if (exceeded_bytes_limit || exceeded_rows_limit) + { + current_sink->onFinish(); + current_sink = createNewSink(); + current_sink_written_rows = 0; + } + + current_sink->consume(chunk); + current_sink_written_rows += chunk.getNumRows(); +} + +void MultiFileStorageObjectStorageSink::onFinish() +{ + current_sink->onFinish(); + commit(); +} + +void MultiFileStorageObjectStorageSink::commit() +{ + /// the commit file path should be in the same directory as the data files + const auto commit_file_path = fs::path(base_path).parent_path() / ("commit_" + transaction_id); + + if (!overwrite_if_exists && object_storage->exists(StoredObject(commit_file_path))) + { + throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "Commit file {} already exists, aborting {} export", commit_file_path, transaction_id); + } + + auto out = object_storage->writeObject( + StoredObject(commit_file_path), + WriteMode::Rewrite, /* attributes= */ + {}, DBMS_DEFAULT_BUFFER_SIZE, + context->getWriteSettings()); + + for (const auto & p : file_paths) + { + out->write(p.data(), p.size()); + out->write("\n", 1); + } + + out->finalize(); +} + +} diff --git a/src/Storages/ObjectStorage/MultiFileStorageObjectStorageSink.h b/src/Storages/ObjectStorage/MultiFileStorageObjectStorageSink.h new file mode 100644 index 000000000000..51f6b8094232 --- /dev/null +++ b/src/Storages/ObjectStorage/MultiFileStorageObjectStorageSink.h @@ -0,0 +1,57 @@ +#pragma once + +#include + +namespace DB +{ + +/// This is useful when the data is too large to fit into a single file. +/// It will create a new file when the current file exceeds the max bytes or max rows limit. +/// Ships a commit file including the list of data files to make it transactional +class MultiFileStorageObjectStorageSink : public SinkToStorage +{ +public: + MultiFileStorageObjectStorageSink( + const std::string & base_path_, + const String & transaction_id_, + ObjectStoragePtr object_storage_, + StorageObjectStorageConfigurationPtr configuration_, + std::size_t max_bytes_per_file_, + std::size_t max_rows_per_file_, + bool overwrite_if_exists_, + const std::function & new_file_path_callback_, + const std::optional & format_settings_, + SharedHeader sample_block_, + ContextPtr context_); + + ~MultiFileStorageObjectStorageSink() override; + + void consume(Chunk & chunk) override; + + void onFinish() override; + + String getName() const override { return "MultiFileStorageObjectStorageSink"; } + +private: + const std::string base_path; + const String transaction_id; + ObjectStoragePtr object_storage; + StorageObjectStorageConfigurationPtr configuration; + std::size_t max_bytes_per_file; + std::size_t max_rows_per_file; + bool overwrite_if_exists; + std::function new_file_path_callback; + const std::optional format_settings; + SharedHeader sample_block; + ContextPtr context; + + std::vector file_paths; + std::shared_ptr current_sink; + std::size_t current_sink_written_rows = 0; + + std::string generateNewFilePath(); + std::shared_ptr createNewSink(); + void commit(); +}; + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 597fea59cd21..e932b373bf11 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include @@ -501,8 +502,10 @@ bool StorageObjectStorage::supportsImport() const SinkToStoragePtr StorageObjectStorage::import( const std::string & file_name, Block & block_with_partition_values, - std::string & destination_file_path, + const std::function & new_file_path_callback, bool overwrite_if_exists, + std::size_t max_bytes_per_file, + std::size_t max_rows_per_file, const std::optional & format_settings_, ContextPtr local_context) { @@ -518,17 +521,17 @@ SinkToStoragePtr StorageObjectStorage::import( } } - destination_file_path = configuration->getPathForWrite(partition_key, file_name).path; + const auto base_path = configuration->getPathForWrite(partition_key, file_name).path; - if (!overwrite_if_exists && object_storage->exists(StoredObject(destination_file_path))) - { - throw Exception(ErrorCodes::FILE_ALREADY_EXISTS, "File {} already exists", destination_file_path); - } - - return std::make_shared( - destination_file_path, + return std::make_shared( + base_path, + /* transaction_id= */ file_name, /// not pretty, but the sink needs some sort of id to generate the commit file name. Using the source part name should be enough object_storage, configuration, + max_bytes_per_file, + max_rows_per_file, + overwrite_if_exists, + new_file_path_callback, format_settings_ ? format_settings_ : format_settings, std::make_shared(getInMemoryMetadataPtr()->getSampleBlock()), local_context); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.h b/src/Storages/ObjectStorage/StorageObjectStorage.h index 9b1fe6ea9aee..f79940c40e12 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.h +++ b/src/Storages/ObjectStorage/StorageObjectStorage.h @@ -87,8 +87,10 @@ class StorageObjectStorage : public IStorage SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - std::string & /* destination_file_path */, + const std::function & new_file_path_callback, bool /* overwrite_if_exists */, + std::size_t /* max_bytes_per_file */, + std::size_t /* max_rows_per_file */, const std::optional & /* format_settings_ */, ContextPtr /* context */) override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 377bd3aff111..f02f65e5ad80 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -584,15 +584,17 @@ bool StorageObjectStorageCluster::supportsImport() const SinkToStoragePtr StorageObjectStorageCluster::import( const std::string & file_name, Block & block_with_partition_values, - std::string & destination_file_path, + const std::function & new_file_path_callback, bool overwrite_if_exists, + std::size_t max_bytes_per_file, + std::size_t max_rows_per_file, const std::optional & format_settings_, ContextPtr context) { if (pure_storage) - return pure_storage->import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, format_settings_, context); + return pure_storage->import(file_name, block_with_partition_values, new_file_path_callback, overwrite_if_exists, max_bytes_per_file, max_rows_per_file, format_settings_, context); - return IStorageCluster::import(file_name, block_with_partition_values, destination_file_path, overwrite_if_exists, format_settings_, context); + return IStorageCluster::import(file_name, block_with_partition_values, new_file_path_callback, overwrite_if_exists, max_bytes_per_file, max_rows_per_file, format_settings_, context); } void StorageObjectStorageCluster::readFallBackToPure( diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h index 72f5bf5dc009..7295ef4dbf68 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.h @@ -128,8 +128,10 @@ class StorageObjectStorageCluster : public IStorageCluster SinkToStoragePtr import( const std::string & /* file_name */, Block & /* block_with_partition_values */, - std::string & /* destination_file_path */, + const std::function & /* new_file_path_callback */, bool /* overwrite_if_exists */, + std::size_t /* max_bytes_per_file */, + std::size_t /* max_rows_per_file */, const std::optional & /* format_settings_ */, ContextPtr /* context */) override; bool prefersLargeBlocks() const override; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp index 48525da19ec7..c395d61eacbb 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.cpp @@ -136,6 +136,13 @@ size_t StorageObjectStorageSink::getFileSize() const return *result_file_size; } +size_t StorageObjectStorageSink::getWrittenBytes() const +{ + if (!write_buf) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Buffer must be initialized before requesting written bytes"); + return write_buf->count(); +} + PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink( ObjectStoragePtr object_storage_, StorageObjectStorageConfigurationPtr configuration_, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSink.h b/src/Storages/ObjectStorage/StorageObjectStorageSink.h index 39873998ad7a..1a35b50ab25b 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSink.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSink.h @@ -29,6 +29,8 @@ friend class StorageObjectStorageImporterSink; const String & getPath() const { return path; } + size_t getWrittenBytes() const; + size_t getFileSize() const; private: diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ec6c43ff775e..0e6b54f06f86 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -200,6 +200,8 @@ namespace Setting extern const SettingsBool output_format_parquet_parallel_encoding; extern const SettingsMaxThreads max_threads; extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy; + extern const SettingsUInt64 export_merge_tree_part_max_bytes_per_file; + extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; } namespace MergeTreeSetting @@ -8222,6 +8224,8 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.max_threads = query_context->getSettingsRef()[Setting::max_threads]; manifest.parallel_formatting = query_context->getSettingsRef()[Setting::output_format_parallel_formatting]; manifest.parquet_parallel_encoding = query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding]; + manifest.max_bytes_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_bytes_per_file]; + manifest.max_rows_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_rows_per_file]; manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value; diff --git a/src/Storages/System/StorageSystemExports.cpp b/src/Storages/System/StorageSystemExports.cpp index bd56a40c3a68..30ba713d914f 100644 --- a/src/Storages/System/StorageSystemExports.cpp +++ b/src/Storages/System/StorageSystemExports.cpp @@ -22,7 +22,7 @@ ColumnsDescription StorageSystemExports::getColumnsDescription() {"destination_table", std::make_shared(), "Name of the destination table."}, {"create_time", std::make_shared(), "Date and time when the export command was received in the server."}, {"part_name", std::make_shared(), "Name of the part"}, - {"destination_file_path", std::make_shared(), "File path where the part is being exported."}, + {"destination_file_paths", std::make_shared(std::make_shared()), "File paths where the part is being exported."}, {"elapsed", std::make_shared(), "The time elapsed (in seconds) since the export started."}, {"rows_read", std::make_shared(), "The number of rows read from the exported part."}, {"total_rows_to_read", std::make_shared(), "The total number of rows to read from the exported part."}, @@ -51,7 +51,11 @@ void StorageSystemExports::fillData(MutableColumns & res_columns, ContextPtr con res_columns[i++]->insert(export_info.destination_table); res_columns[i++]->insert(export_info.create_time); res_columns[i++]->insert(export_info.part_name); - res_columns[i++]->insert(export_info.destination_file_path); + Array destination_file_paths_array; + destination_file_paths_array.reserve(export_info.destination_file_paths.size()); + for (const auto & file_path : export_info.destination_file_paths) + destination_file_paths_array.push_back(file_path); + res_columns[i++]->insert(destination_file_paths_array); res_columns[i++]->insert(export_info.elapsed); res_columns[i++]->insert(export_info.rows_read); res_columns[i++]->insert(export_info.total_rows_to_read); diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference index 00fc51f68254..d11773c3c9cd 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference @@ -31,3 +31,14 @@ 2 2020 3 2020 4 2021 +---- Test max_bytes and max_rows per file +---- Count files in big_destination_max_bytes, should be 5 (4 parquet, 1 commit) +5 +---- Count rows in big_table and big_destination_max_bytes +4194304 +4194304 +---- Count files in big_destination_max_rows, should be 5 (4 parquet, 1 commit) +5 +---- Count rows in big_table and big_destination_max_rows +4194304 +4194304 diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh index ae7f05dac90a..669da7a9d163 100755 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh @@ -15,6 +15,9 @@ s3_table="s3_table_${RANDOM}" s3_table_wildcard="s3_table_wildcard_${RANDOM}" s3_table_wildcard_partition_expression_with_function="s3_table_wildcard_partition_expression_with_function_${RANDOM}" mt_table_roundtrip="mt_table_roundtrip_${RANDOM}" +big_table="big_table_${RANDOM}" +big_destination_max_bytes="big_destination_max_bytes_${RANDOM}" +big_destination_max_rows="big_destination_max_rows_${RANDOM}" query() { $CLICKHOUSE_CLIENT --query "$1" @@ -73,4 +76,42 @@ sleep 1 echo "---- Both data parts should appear" query "SELECT * FROM s3(s3_conn, filename='$s3_table_wildcard_partition_expression_with_function/**.parquet') ORDER BY id" -query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function" +echo "---- Test max_bytes and max_rows per file" + +query "CREATE TABLE $big_table (id UInt64, data String, year UInt16) Engine=MergeTree() order by id partition by year" + +query "CREATE TABLE $big_destination_max_bytes(id UInt64, data String, year UInt16) engine=S3(s3_conn, filename='$big_destination_max_bytes', partition_strategy='hive', format=Parquet) partition by year" + +query "CREATE TABLE $big_destination_max_rows(id UInt64, data String, year UInt16) engine=S3(s3_conn, filename='$big_destination_max_rows', partition_strategy='hive', format=Parquet) partition by year" + +# 4194304 is a number that came up during multiple iterations, it does not really mean anything (aside from the fact that the below numbers depend on it) +query "INSERT INTO $big_table SELECT number AS id, repeat('x', 100) AS data, 2025 AS year FROM numbers(4194304)" + +# make sure we have only one part +query "OPTIMIZE TABLE $big_table FINAL" + +big_part=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$big_table' AND partition_id = '2025' AND active = 1 ORDER BY name LIMIT 1" | tr -d '\n') + +# this should generate ~4 files +query "ALTER TABLE $big_table EXPORT PART '$big_part' TO TABLE $big_destination_max_bytes SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_max_bytes_per_file=3500000, output_format_parquet_row_group_size_bytes=1000000" +# export_merge_tree_part_max_rows_per_file = 1048576 (which is 4194304/4) to generate 4 files +query "ALTER TABLE $big_table EXPORT PART '$big_part' TO TABLE $big_destination_max_rows SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_max_rows_per_file=1048576" + +# sleeping a little longer because it will write multiple files, trying not be flaky +sleep 20 + +echo "---- Count files in big_destination_max_bytes, should be 5 (4 parquet, 1 commit)" +query "SELECT count(_file) FROM s3(s3_conn, filename='$big_destination_max_bytes/**', format='One')" + +echo "---- Count rows in big_table and big_destination_max_bytes" +query "SELECT COUNT() from $big_table" +query "SELECT COUNT() from $big_destination_max_bytes" + +echo "---- Count files in big_destination_max_rows, should be 5 (4 parquet, 1 commit)" +query "SELECT count(_file) FROM s3(s3_conn, filename='$big_destination_max_rows/**', format='One')" + +echo "---- Count rows in big_table and big_destination_max_rows" +query "SELECT COUNT() from $big_table" +query "SELECT COUNT() from $big_destination_max_rows" + +query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function, $big_table, $big_destination_max_bytes, $big_destination_max_rows"