Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 112 additions & 3 deletions docs/en/engines/table-engines/mergetree-family/part_export.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Overview

The `ALTER TABLE EXPORT PART` command exports individual MergeTree data parts to object storage (S3, Azure Blob Storage, etc.), typically in Parquet format.
The `ALTER TABLE EXPORT PART` command exports individual MergeTree data parts to object storage (S3, Azure Blob Storage, etc.), typically in Parquet format. A commit file is shipped to the same destination directory containing all data files exported within that transaction.

**Key Characteristics:**
- **Experimental feature** - must be enabled via `allow_experimental_export_merge_tree_part` setting
Expand Down Expand Up @@ -48,6 +48,18 @@ Source and destination tables must be 100% compatible:
- **Default**: `false`
- **Description**: If set to `true`, it will overwrite the file. Otherwise, fails with exception.

### `export_merge_tree_part_max_bytes_per_file` (Optional)

- **Type**: `UInt64`
- **Default**: `0`
- **Description**: Maximum number of bytes to write to a single file when exporting a merge tree part. 0 means no limit. This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. Using this might break idempotency, use it with care.

### `export_merge_tree_part_max_rows_per_file` (Optional)

- **Type**: `UInt64`
- **Default**: `0`
- **Description**: Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit. This is not a hard limit, and it highly depends on the output format granularity and input source chunk size. Using this might break idempotency, use it with care.

## Examples

### Basic Export to S3
Expand Down Expand Up @@ -93,7 +105,7 @@ destination_database: default
destination_table: destination_table
create_time: 2025-11-19 09:09:11
part_name: 20251016-365_1_1_0
destination_file_path: table_root/eventDate=2025-10-16/retention=365/20251016-365_1_1_0_17B2F6CD5D3C18E787C07AE3DAF16EB1.parquet
destination_file_paths: ['table_root/eventDate=2025-10-16/retention=365/20251016-365_1_1_0_17B2F6CD5D3C18E787C07AE3DAF16EB1.1.parquet']
elapsed: 2.04845441
rows_read: 1138688 -- 1.14 million
total_rows_to_read: 550961374 -- 550.96 million
Expand Down Expand Up @@ -138,7 +150,8 @@ partition_id: 2021
partition: 2021
part_type: Compact
disk_name: default
path_on_disk: year=2021/2021_0_0_0_78C704B133D41CB0EF64DD2A9ED3B6BA.parquet
path_on_disk:
remote_file_paths ['year=2021/2021_0_0_0_78C704B133D41CB0EF64DD2A9ED3B6BA.1.parquet']
rows: 1
size_in_bytes: 272
merged_from: ['2021_0_0_0']
Expand All @@ -158,3 +171,99 @@ ProfileEvents: {}
- `PartsExportDuplicated` - Number of part exports that failed because target already exists.
- `PartsExportTotalMilliseconds` - Total time

### Split large files

```sql
alter table big_table export part '2025_0_32_3' to table replicated_big_destination SETTINGS export_merge_tree_part_max_bytes_per_file=10000000, output_format_parquet_row_group_size_bytes=5000000;

arthur :) select * from system.exports;

SELECT *
FROM system.exports

Query id: d78d9ce5-cfbc-4957-b7dd-bc8129811634

Row 1:
──────
source_database: default
source_table: big_table
destination_database: default
destination_table: replicated_big_destination
create_time: 2025-12-15 13:12:48
part_name: 2025_0_32_3
destination_file_paths: ['replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.1.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.2.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.3.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.4.parquet']
elapsed: 14.360427274
rows_read: 10256384 -- 10.26 million
total_rows_to_read: 10485760 -- 10.49 million
total_size_bytes_compressed: 83779395 -- 83.78 million
total_size_bytes_uncompressed: 10611691600 -- 10.61 billion
bytes_read_uncompressed: 10440998912 -- 10.44 billion
memory_usage: 89795477 -- 89.80 million
peak_memory_usage: 107362133 -- 107.36 million

1 row in set. Elapsed: 0.014 sec.

arthur :) select * from system.part_log where event_type = 'ExportPart' order by event_time desc limit 1 format Vertical;

SELECT *
FROM system.part_log
WHERE event_type = 'ExportPart'
ORDER BY event_time DESC
LIMIT 1
FORMAT Vertical

Query id: 95128b01-b751-4726-8e3e-320728ac6af7

Row 1:
──────
hostname: arthur
query_id:
event_type: ExportPart
merge_reason: NotAMerge
merge_algorithm: Undecided
event_date: 2025-12-15
event_time: 2025-12-15 13:13:03
event_time_microseconds: 2025-12-15 13:13:03.197492
duration_ms: 14673
database: default
table: big_table
table_uuid: a3eeeea0-295c-41a3-84ef-6b5463dbbe8c
part_name: 2025_0_32_3
partition_id: 2025
partition: 2025
part_type: Wide
disk_name: default
path_on_disk: ./store/a3e/a3eeeea0-295c-41a3-84ef-6b5463dbbe8c/2025_0_32_3/
remote_file_paths: ['replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.1.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.2.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.3.parquet','replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.4.parquet']
rows: 10485760 -- 10.49 million
size_in_bytes: 83779395 -- 83.78 million
merged_from: ['2025_0_32_3']
bytes_uncompressed: 10611691600 -- 10.61 billion
read_rows: 10485760 -- 10.49 million
read_bytes: 10674503680 -- 10.67 billion
peak_memory_usage: 107362133 -- 107.36 million
error: 0
exception:
ProfileEvents: {}

1 row in set. Elapsed: 0.044 sec.

arthur :) select _path, formatReadableSize(_size) as _size from s3(s3_conn, filename='**', format=One);

SELECT
_path,
formatReadableSize(_size) AS _size
FROM s3(s3_conn, filename = '**', format = One)

Query id: c48ae709-f590-4d1b-8158-191f8d628966

┌─_path────────────────────────────────────────────────────────────────────────────────┬─_size─────┐
1. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.1.parquet │ 17.36 MiB │
2. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.2.parquet │ 17.32 MiB │
3. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.4.parquet │ 5.04 MiB │
4. │ test/replicated_big/year=2025/2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7.3.parquet │ 17.40 MiB │
5. │ test/replicated_big/year=2025/commit_2025_0_32_3_E439C23833C39C6E5104F6F4D1048BE7 │ 320.00 B │
└──────────────────────────────────────────────────────────────────────────────────────┴───────────┘

5 rows in set. Elapsed: 0.072 sec.
```
8 changes: 8 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6899,6 +6899,14 @@ Possible values:
- `` (empty value) - use session timezone

Default value is `UTC`.
)", 0) \
DECLARE(UInt64, export_merge_tree_part_max_bytes_per_file, 0, R"(
Maximum number of bytes to write to a single file when exporting a merge tree part. 0 means no limit.
This is not a hard limit, and it highly depends on the output format granularity and input source chunk size.
)", 0) \
DECLARE(UInt64, export_merge_tree_part_max_rows_per_file, 0, R"(
Maximum number of rows to write to a single file when exporting a merge tree part. 0 means no limit.
This is not a hard limit, and it highly depends on the output format granularity and input source chunk size.
)", 0) \
\
/* ####################################################### */ \
Expand Down
4 changes: 3 additions & 1 deletion src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.8",
{
Expand Down
7 changes: 7 additions & 0 deletions src/Interpreters/PartLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ ColumnsDescription PartLogElement::getColumnsDescription()
{"part_type", std::make_shared<DataTypeString>(), "The type of the part. Possible values: Wide and Compact."},
{"disk_name", std::make_shared<DataTypeString>(), "The disk name data part lies on."},
{"path_on_disk", std::make_shared<DataTypeString>(), "Absolute path to the folder with data part files."},
{"remote_file_paths", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "In case of an export operation to remote storages, the file paths a given export generated"},

{"rows", std::make_shared<DataTypeUInt64>(), "The number of rows in the data part."},
{"size_in_bytes", std::make_shared<DataTypeUInt64>(), "Size of the data part on disk in bytes."},
Expand Down Expand Up @@ -187,6 +188,12 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(disk_name);
columns[i++]->insert(path_on_disk);

Array remote_file_paths_array;
remote_file_paths_array.reserve(remote_file_paths.size());
for (const auto & remote_file_path : remote_file_paths)
remote_file_paths_array.push_back(remote_file_path);
columns[i++]->insert(remote_file_paths_array);

columns[i++]->insert(rows);
columns[i++]->insert(bytes_compressed_on_disk);

Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/PartLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct PartLogElement
String partition;
String disk_name;
String path_on_disk;
std::vector<String> remote_file_paths;

MergeTreeDataPartType part_type;

Expand Down
17 changes: 13 additions & 4 deletions src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ struct ExportReplicatedMergeTreePartitionProcessingPartEntry
struct ExportReplicatedMergeTreePartitionProcessedPartEntry
{
String part_name;
String path_in_destination;
std::vector<String> paths_in_destination;
String finished_by;

std::string toJsonString() const
{
Poco::JSON::Object json;
json.set("part_name", part_name);
json.set("path_in_destination", path_in_destination);
json.set("paths_in_destination", paths_in_destination);
json.set("finished_by", finished_by);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Expand All @@ -86,7 +86,11 @@ struct ExportReplicatedMergeTreePartitionProcessedPartEntry
ExportReplicatedMergeTreePartitionProcessedPartEntry entry;

entry.part_name = json->getValue<String>("part_name");
entry.path_in_destination = json->getValue<String>("path_in_destination");

const auto paths_in_destination_array = json->getArray("paths_in_destination");
for (size_t i = 0; i < paths_in_destination_array->size(); ++i)
entry.paths_in_destination.emplace_back(paths_in_destination_array->getElement<String>(static_cast<unsigned int>(i)));
Comment on lines +90 to +92

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Handle legacy processed-part manifest entries

fromJsonString now assumes every processed entry contains a paths_in_destination array and immediately dereferences it. Existing ZooKeeper nodes written by previous versions stored a single path_in_destination string, so after an upgrade getArray will return null (or throw), crashing ExportPartitionUtils::getExportedPaths when it reads legacy entries and preventing pending exports from ever committing. Please keep parsing the old field or guard against missing arrays before dereferencing.

Useful? React with 👍 / 👎.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh we've been through that multiple times


entry.finished_by = json->getValue<String>("finished_by");

return entry;
Expand All @@ -108,6 +112,8 @@ struct ExportReplicatedMergeTreePartitionManifest
size_t max_threads;
bool parallel_formatting;
bool parquet_parallel_encoding;
size_t max_bytes_per_file;
size_t max_rows_per_file;
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;

std::string toJsonString() const
Expand All @@ -127,6 +133,8 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("parallel_formatting", parallel_formatting);
json.set("max_threads", max_threads);
json.set("parquet_parallel_encoding", parquet_parallel_encoding);
json.set("max_bytes_per_file", max_bytes_per_file);
json.set("max_rows_per_file", max_rows_per_file);
json.set("file_already_exists_policy", String(magic_enum::enum_name(file_already_exists_policy)));
json.set("create_time", create_time);
json.set("max_retries", max_retries);
Expand Down Expand Up @@ -160,7 +168,8 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.max_threads = json->getValue<size_t>("max_threads");
manifest.parallel_formatting = json->getValue<bool>("parallel_formatting");
manifest.parquet_parallel_encoding = json->getValue<bool>("parquet_parallel_encoding");

manifest.max_bytes_per_file = json->getValue<size_t>("max_bytes_per_file");
manifest.max_rows_per_file = json->getValue<size_t>("max_rows_per_file");
Comment on lines 168 to +172

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Partition export manifests from earlier replicas fail to parse

ExportReplicatedMergeTreePartitionManifest::fromJsonString now unconditionally reads max_bytes_per_file and max_rows_per_file via getValue, which throws if the fields are absent. ZooKeeper entries created before this change don’t contain those keys, so after upgrading a node any pending export_partition task will raise during manifest parsing and never be scheduled/committed. The parser needs a default when the new fields are missing to remain backward compatible with existing manifests.

Useful? React with 👍 / 👎.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a problem. Nobody is using the export partition feature for now, and even if they were, the zookeeper entries would have expired (unless a big TTL was set, which I doubt)

if (json->has("file_already_exists_policy"))
{
const auto file_already_exists_policy = magic_enum::enum_cast<MergeTreePartExportManifest::FileAlreadyExistsPolicy>(json->getValue<String>("file_already_exists_policy"));
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,10 @@ It is currently only implemented in StorageObjectStorage.
virtual SinkToStoragePtr import(
const std::string & /* file_name */,
Block & /* block_with_partition_values */,
std::string & /* destination_file_path */,
const std::function<void(const std::string &)> & /* new_file_path_callback */,
bool /* overwrite_if_exists */,
std::size_t /* max_bytes_per_file */,
std::size_t /* max_rows_per_file */,
const std::optional<FormatSettings> & /* format_settings */,
ContextPtr /* context */)
{
Expand Down
17 changes: 11 additions & 6 deletions src/Storages/MergeTree/ExportList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ ExportsListElement::ExportsListElement(
const StorageID & destination_table_id_,
UInt64 part_size_,
const String & part_name_,
const String & target_file_name_,
const std::vector<String> & destination_file_paths_,
UInt64 total_rows_to_read_,
UInt64 total_size_bytes_compressed_,
UInt64 total_size_bytes_uncompressed_,
Expand All @@ -18,7 +18,7 @@ ExportsListElement::ExportsListElement(
, destination_table_id(destination_table_id_)
, part_size(part_size_)
, part_name(part_name_)
, destination_file_path(target_file_name_)
, destination_file_paths(destination_file_paths_)
, total_rows_to_read(total_rows_to_read_)
, total_size_bytes_compressed(total_size_bytes_compressed_)
, total_size_bytes_uncompressed(total_size_bytes_uncompressed_)
Expand All @@ -40,16 +40,21 @@ ExportInfo ExportsListElement::getInfo() const
res.destination_database = destination_table_id.database_name;
res.destination_table = destination_table_id.table_name;
res.part_name = part_name;
res.destination_file_path = destination_file_path;
res.rows_read = rows_read;

{
std::shared_lock lock(destination_file_paths_mutex);
res.destination_file_paths = destination_file_paths;
}

res.rows_read = rows_read.load(std::memory_order_relaxed);
res.total_rows_to_read = total_rows_to_read;
res.total_size_bytes_compressed = total_size_bytes_compressed;
res.total_size_bytes_uncompressed = total_size_bytes_uncompressed;
res.bytes_read_uncompressed = bytes_read_uncompressed;
res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed);
res.memory_usage = getMemoryUsage();
res.peak_memory_usage = getPeakMemoryUsage();
res.create_time = create_time;
res.elapsed = elapsed;
res.elapsed = watch.elapsedSeconds();
return res;
}

Expand Down
15 changes: 9 additions & 6 deletions src/Storages/MergeTree/ExportList.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Common/ThreadStatus.h>
#include <Poco/URI.h>
#include <boost/noncopyable.hpp>
#include <shared_mutex>

namespace CurrentMetrics
{
Expand All @@ -23,7 +24,7 @@ struct ExportInfo
String destination_database;
String destination_table;
String part_name;
String destination_file_path;
std::vector<String> destination_file_paths;
UInt64 rows_read;
UInt64 total_rows_to_read;
UInt64 total_size_bytes_compressed;
Expand All @@ -41,24 +42,26 @@ struct ExportsListElement : private boost::noncopyable
const StorageID destination_table_id;
const UInt64 part_size;
const String part_name;
String destination_file_path;
UInt64 rows_read {0};

/// see destination_file_paths_mutex
std::vector<String> destination_file_paths;
std::atomic<UInt64> rows_read {0};
UInt64 total_rows_to_read {0};
UInt64 total_size_bytes_compressed {0};
UInt64 total_size_bytes_uncompressed {0};
UInt64 bytes_read_uncompressed {0};
std::atomic<UInt64> bytes_read_uncompressed {0};
time_t create_time {0};
Float64 elapsed {0};

Stopwatch watch;
ThreadGroupPtr thread_group;
mutable std::shared_mutex destination_file_paths_mutex;

ExportsListElement(
const StorageID & source_table_id_,
const StorageID & destination_table_id_,
UInt64 part_size_,
const String & part_name_,
const String & destination_file_path_,
const std::vector<String> & destination_file_paths_,
UInt64 total_rows_to_read_,
UInt64 total_size_bytes_compressed_,
UInt64 total_size_bytes_uncompressed_,
Expand Down
Loading
Loading