From 920adbf8b3d504c556267e3108c1fc4da2ed932e Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Wed, 22 Jan 2025 14:02:26 -0300 Subject: [PATCH 1/2] draft impl --- src/Interpreters/InterpreterAlterQuery.cpp | 5 ++ src/Parsers/ASTAlterQuery.cpp | 6 ++ src/Parsers/ASTAlterQuery.h | 1 + src/Parsers/CommonParsers.h | 1 + src/Parsers/ParserAlterQuery.cpp | 26 ++++++ src/Storages/MergeTree/MergeTreeData.cpp | 16 ++++ .../MergeTree/exportMTPartToParquet.cpp | 84 +++++++++++++++++++ .../MergeTree/exportMTPartToParquet.h | 10 +++ src/Storages/PartitionCommands.cpp | 8 ++ src/Storages/PartitionCommands.h | 1 + 10 files changed, 158 insertions(+) create mode 100644 src/Storages/MergeTree/exportMTPartToParquet.cpp create mode 100644 src/Storages/MergeTree/exportMTPartToParquet.h diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 907026c73a35..530cc86f1dc4 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -479,6 +479,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS } break; } + case ASTAlterCommand::EXPORT_PART: + { + required_access.emplace_back(AccessType::ALTER_MOVE_PARTITION, database, table); + break; + } case ASTAlterCommand::REPLACE_PARTITION: { required_access.emplace_back(AccessType::SELECT, command.from_database, command.from_table); diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index fe258b6df15e..137e6a878712 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -343,6 +343,12 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett ostr << quoteString(move_destination_name); } } + else if (type == ASTAlterCommand::EXPORT_PART) + { + ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << (part ? "PART " : "PARTITION ") + << (settings.hilite ? hilite_none : ""); + partition->formatImpl(ostr, settings, state, frame); + } else if (type == ASTAlterCommand::REPLACE_PARTITION) { ostr << (settings.hilite ? hilite_keyword : "") << (replace ? "REPLACE" : "ATTACH") << " PARTITION " diff --git a/src/Parsers/ASTAlterQuery.h b/src/Parsers/ASTAlterQuery.h index ab19ed9a55ab..9e4280bae58a 100644 --- a/src/Parsers/ASTAlterQuery.h +++ b/src/Parsers/ASTAlterQuery.h @@ -71,6 +71,7 @@ class ASTAlterCommand : public IAST FREEZE_ALL, UNFREEZE_PARTITION, UNFREEZE_ALL, + EXPORT_PART, DELETE, UPDATE, diff --git a/src/Parsers/CommonParsers.h b/src/Parsers/CommonParsers.h index ef52eab6cf78..f0705508f445 100644 --- a/src/Parsers/CommonParsers.h +++ b/src/Parsers/CommonParsers.h @@ -325,6 +325,7 @@ namespace DB MR_MACROS(MONTHS, "MONTHS") \ MR_MACROS(MOVE_PART, "MOVE PART") \ MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \ + MR_MACROS(EXPORT_PART, "EXPORT PART") \ MR_MACROS(MOVE, "MOVE") \ MR_MACROS(MS, "MS") \ MR_MACROS(MUTATION, "MUTATION") \ diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index 0c8f05c90d2d..a3988350bb3f 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -82,6 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION); ParserKeyword s_move_partition(Keyword::MOVE_PARTITION); ParserKeyword s_move_part(Keyword::MOVE_PART); + ParserKeyword s_export_part(Keyword::EXPORT_PART); ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION); ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART); ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION); @@ -554,6 +555,31 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected command->move_destination_name = ast_space_name->as().value.safeGet(); } } + else if (s_export_part.ignore(pos, expected)) + { + if (!parser_string_and_substituion.parse(pos, command_partition, expected)) + return false; + + command->type = ASTAlterCommand::EXPORT_PART; + command->part = true; + +// if (s_to_disk.ignore(pos, expected)) +// command->move_destination_type = DataDestinationType::DISK; +// else if (s_to_volume.ignore(pos, expected)) +// command->move_destination_type = DataDestinationType::VOLUME; +// else if (s_to_shard.ignore(pos, expected)) +// { +// command->move_destination_type = DataDestinationType::SHARD; +// } +// else +// return false; +// +// ASTPtr ast_space_name; +// if (!parser_string_literal.parse(pos, ast_space_name, expected)) +// return false; +// +// command->move_destination_name = ast_space_name->as().value.safeGet(); + } else if (s_add_constraint.ignore(pos, expected)) { if (s_if_not_exists.ignore(pos, expected)) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a8aaa8448c7c..3da04e8d4cba 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -85,6 +85,7 @@ #include #include #include +#include #include #include @@ -5594,6 +5595,21 @@ Pipe MergeTreeData::alterPartition( } break; + case PartitionCommand::EXPORT_PART: + { + if (command.part) + { + auto part_name = command.partition->as().value.safeGet(); + auto data_part = getPartIfExists(part_name, {DataPartStates::value_type::Active}); + + if (!data_part) + throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in committed state", part_name); + + exportMTPartToParquet(*this, data_part, query_context); + } + break; + } + case PartitionCommand::DROP_DETACHED_PARTITION: dropDetached(command.partition, command.part, query_context); break; diff --git a/src/Storages/MergeTree/exportMTPartToParquet.cpp b/src/Storages/MergeTree/exportMTPartToParquet.cpp new file mode 100644 index 000000000000..ac99f5a87f65 --- /dev/null +++ b/src/Storages/MergeTree/exportMTPartToParquet.cpp @@ -0,0 +1,84 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::DataPartPtr & data_part, ContextPtr context) +{ + auto metadata_snapshot = data.getInMemoryMetadataPtr(); + Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); + StorageSnapshotPtr storage_snapshot = data.getStorageSnapshot(metadata_snapshot, context); + + MergeTreeData::IMutationsSnapshot::Params params + { + .metadata_version = metadata_snapshot->getMetadataVersion(), + .min_part_metadata_version = data_part->getMetadataVersion(), + }; + + auto mutations_snapshot = data.getMutationsSnapshot(params); + + auto alter_conversions = MergeTreeData::getAlterConversionsForPart( + data_part, + mutations_snapshot, + metadata_snapshot, + context); + + QueryPlan plan; + + // todoa arthur + MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Merge; + + bool apply_deleted_mask = true; + bool read_with_direct_io = false; + bool prefetch = false; + + createReadFromPartStep( + read_type, + plan, + data, + storage_snapshot, + data_part, + alter_conversions, + columns_to_read, + nullptr, + apply_deleted_mask, + std::nullopt, + read_with_direct_io, + prefetch, + context, + getLogger("abcde")); + + auto pipeline_settings = BuildQueryPipelineSettings::fromContext(context); + auto optimization_settings = QueryPlanOptimizationSettings::fromContext(context); + auto builder = plan.buildQueryPipeline(optimization_settings, pipeline_settings); + + QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); + auto header_block = pipeline.getHeader(); + + auto out_file_name = data_part->name + ".parquet"; + + auto out_file = std::make_shared(out_file_name); + auto parquet_output = FormatFactory::instance().getOutputFormat("Parquet", *out_file, header_block, context); + PullingPipelineExecutor executor(pipeline); + + Block block; + while (executor.pull(block)) + { + parquet_output->write(block); + } + + parquet_output->finalize(); + + out_file->finalize(); +} + +} diff --git a/src/Storages/MergeTree/exportMTPartToParquet.h b/src/Storages/MergeTree/exportMTPartToParquet.h new file mode 100644 index 000000000000..46beef551594 --- /dev/null +++ b/src/Storages/MergeTree/exportMTPartToParquet.h @@ -0,0 +1,10 @@ +#pragma once + +#include + +namespace DB +{ + +void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::DataPartPtr & data_part, ContextPtr context); + +} diff --git a/src/Storages/PartitionCommands.cpp b/src/Storages/PartitionCommands.cpp index 0f7dadd75b6a..a75211f45121 100644 --- a/src/Storages/PartitionCommands.cpp +++ b/src/Storages/PartitionCommands.cpp @@ -51,6 +51,14 @@ std::optional PartitionCommand::parse(const ASTAlterCommand * res.part = command_ast->part; return res; } + if (command_ast->type == ASTAlterCommand::EXPORT_PART) + { + PartitionCommand res; + res.type = EXPORT_PART; + res.partition = command_ast->partition->clone(); + res.part = command_ast->part; + return res; + } if (command_ast->type == ASTAlterCommand::MOVE_PARTITION) { PartitionCommand res; diff --git a/src/Storages/PartitionCommands.h b/src/Storages/PartitionCommands.h index 917e510f24b4..15d2a7fb869f 100644 --- a/src/Storages/PartitionCommands.h +++ b/src/Storages/PartitionCommands.h @@ -33,6 +33,7 @@ struct PartitionCommand UNFREEZE_ALL_PARTITIONS, UNFREEZE_PARTITION, REPLACE_PARTITION, + EXPORT_PART, }; Type type = UNKNOWN; From 1e38200b28dd4d9cd0ce00a33545c25bf8d49207 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Mon, 10 Feb 2025 11:05:00 -0300 Subject: [PATCH 2/2] impl --- src/Interpreters/InterpreterAlterQuery.cpp | 1 + src/Parsers/ASTAlterQuery.cpp | 17 +++++ src/Parsers/CommonParsers.h | 2 +- src/Parsers/ParserAlterQuery.cpp | 30 ++++----- src/Storages/MergeTree/MergeTreeData.cpp | 66 ++++++++----------- src/Storages/MergeTree/MergeTreeData.h | 2 + .../MergeTree/exportMTPartToParquet.h | 10 --- ...oParquet.cpp => exportMTPartToStorage.cpp} | 42 +++++------- .../MergeTree/exportMTPartToStorage.h | 10 +++ .../ObjectStorage/S3/Configuration.cpp | 2 +- .../ObjectStorage/StorageObjectStorage.cpp | 1 + src/Storages/PartitionCommands.cpp | 3 + src/Storages/StorageMergeTree.cpp | 38 +++++++++++ src/Storages/StorageMergeTree.h | 1 + 14 files changed, 130 insertions(+), 95 deletions(-) delete mode 100644 src/Storages/MergeTree/exportMTPartToParquet.h rename src/Storages/MergeTree/{exportMTPartToParquet.cpp => exportMTPartToStorage.cpp} (63%) create mode 100644 src/Storages/MergeTree/exportMTPartToStorage.h diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 530cc86f1dc4..fc18efda717c 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -482,6 +482,7 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS case ASTAlterCommand::EXPORT_PART: { required_access.emplace_back(AccessType::ALTER_MOVE_PARTITION, database, table); + required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table); break; } case ASTAlterCommand::REPLACE_PARTITION: diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index 137e6a878712..24491f02ef0c 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -348,6 +348,23 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << (part ? "PART " : "PARTITION ") << (settings.hilite ? hilite_none : ""); partition->formatImpl(ostr, settings, state, frame); + ostr << " TO "; + switch (move_destination_type) + { + case DataDestinationType::TABLE: + ostr << "TABLE "; + if (!to_database.empty()) + { + ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database) + << (settings.hilite ? hilite_none : "") << "."; + } + ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table) + << (settings.hilite ? hilite_none : ""); + return; + default: + break; + } + } else if (type == ASTAlterCommand::REPLACE_PARTITION) { diff --git a/src/Parsers/CommonParsers.h b/src/Parsers/CommonParsers.h index f0705508f445..d08c880407ca 100644 --- a/src/Parsers/CommonParsers.h +++ b/src/Parsers/CommonParsers.h @@ -325,7 +325,7 @@ namespace DB MR_MACROS(MONTHS, "MONTHS") \ MR_MACROS(MOVE_PART, "MOVE PART") \ MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \ - MR_MACROS(EXPORT_PART, "EXPORT PART") \ + MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \ MR_MACROS(MOVE, "MOVE") \ MR_MACROS(MS, "MS") \ MR_MACROS(MUTATION, "MUTATION") \ diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index a3988350bb3f..00275210c438 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -82,7 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION); ParserKeyword s_move_partition(Keyword::MOVE_PARTITION); ParserKeyword s_move_part(Keyword::MOVE_PART); - ParserKeyword s_export_part(Keyword::EXPORT_PART); + ParserKeyword s_export_part(Keyword::EXPORT_PARTITION); ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION); ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART); ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION); @@ -557,28 +557,20 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected } else if (s_export_part.ignore(pos, expected)) { - if (!parser_string_and_substituion.parse(pos, command_partition, expected)) + if (!parser_partition.parse(pos, command_partition, expected)) return false; command->type = ASTAlterCommand::EXPORT_PART; - command->part = true; +// command->part = true; + + if (!s_to_table.ignore(pos, expected)) + { + return false; + } -// if (s_to_disk.ignore(pos, expected)) -// command->move_destination_type = DataDestinationType::DISK; -// else if (s_to_volume.ignore(pos, expected)) -// command->move_destination_type = DataDestinationType::VOLUME; -// else if (s_to_shard.ignore(pos, expected)) -// { -// command->move_destination_type = DataDestinationType::SHARD; -// } -// else -// return false; -// -// ASTPtr ast_space_name; -// if (!parser_string_literal.parse(pos, ast_space_name, expected)) -// return false; -// -// command->move_destination_name = ast_space_name->as().value.safeGet(); + if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table)) + return false; + command->move_destination_type = DataDestinationType::TABLE; } else if (s_add_constraint.ignore(pos, expected)) { diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 3da04e8d4cba..6fb5bc289e97 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -9,24 +9,10 @@ #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include +#include +#include #include #include #include @@ -45,19 +31,20 @@ #include #include #include -#include -#include #include #include #include #include #include #include +#include +#include #include +#include #include -#include -#include #include +#include +#include #include #include #include @@ -65,27 +52,41 @@ #include #include #include -#include #include #include #include #include -#include #include +#include #include #include #include -#include -#include +#include #include +#include +#include +#include #include #include -#include +#include +#include +#include #include #include #include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include @@ -5597,16 +5598,7 @@ Pipe MergeTreeData::alterPartition( case PartitionCommand::EXPORT_PART: { - if (command.part) - { - auto part_name = command.partition->as().value.safeGet(); - auto data_part = getPartIfExists(part_name, {DataPartStates::value_type::Active}); - - if (!data_part) - throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in committed state", part_name); - - exportMTPartToParquet(*this, data_part, query_context); - } + exportPartitionToTable(command, query_context); break; } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index ff1a9e766cf8..8dd2876962f9 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -856,6 +856,8 @@ class MergeTreeData : public IStorage, public WithMutableContext /// Moves partition to specified Table void movePartitionToTable(const PartitionCommand & command, ContextPtr query_context); + virtual void exportPartitionToTable(const PartitionCommand &, ContextPtr) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "export not implemented");} + /// Checks that Partition could be dropped right now /// Otherwise - throws an exception with detailed information. /// We do not use mutex because it is not very important that the size could change during the operation. diff --git a/src/Storages/MergeTree/exportMTPartToParquet.h b/src/Storages/MergeTree/exportMTPartToParquet.h deleted file mode 100644 index 46beef551594..000000000000 --- a/src/Storages/MergeTree/exportMTPartToParquet.h +++ /dev/null @@ -1,10 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - -void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::DataPartPtr & data_part, ContextPtr context); - -} diff --git a/src/Storages/MergeTree/exportMTPartToParquet.cpp b/src/Storages/MergeTree/exportMTPartToStorage.cpp similarity index 63% rename from src/Storages/MergeTree/exportMTPartToParquet.cpp rename to src/Storages/MergeTree/exportMTPartToStorage.cpp index ac99f5a87f65..498ccfbca762 100644 --- a/src/Storages/MergeTree/exportMTPartToParquet.cpp +++ b/src/Storages/MergeTree/exportMTPartToStorage.cpp @@ -1,22 +1,22 @@ -#include -#include -#include -#include +#include +#include #include +#include +#include +#include #include -#include -#include -#include +#include +#include namespace DB { -void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::DataPartPtr & data_part, ContextPtr context) +void exportMTPartToStorage(const MergeTreeData & source_data, const MergeTreeData::DataPartPtr & data_part, SinkToStoragePtr dst_storage_sink, ContextPtr context) { - auto metadata_snapshot = data.getInMemoryMetadataPtr(); + auto metadata_snapshot = source_data.getInMemoryMetadataPtr(); Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical(); - StorageSnapshotPtr storage_snapshot = data.getStorageSnapshot(metadata_snapshot, context); + StorageSnapshotPtr storage_snapshot = source_data.getStorageSnapshot(metadata_snapshot, context); MergeTreeData::IMutationsSnapshot::Params params { @@ -24,7 +24,7 @@ void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::Data .min_part_metadata_version = data_part->getMetadataVersion(), }; - auto mutations_snapshot = data.getMutationsSnapshot(params); + auto mutations_snapshot = source_data.getMutationsSnapshot(params); auto alter_conversions = MergeTreeData::getAlterConversionsForPart( data_part, @@ -44,7 +44,7 @@ void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::Data createReadFromPartStep( read_type, plan, - data, + source_data, storage_snapshot, data_part, alter_conversions, @@ -62,23 +62,11 @@ void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::Data auto builder = plan.buildQueryPipeline(optimization_settings, pipeline_settings); QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); - auto header_block = pipeline.getHeader(); - - auto out_file_name = data_part->name + ".parquet"; - - auto out_file = std::make_shared(out_file_name); - auto parquet_output = FormatFactory::instance().getOutputFormat("Parquet", *out_file, header_block, context); - PullingPipelineExecutor executor(pipeline); - - Block block; - while (executor.pull(block)) - { - parquet_output->write(block); - } - parquet_output->finalize(); + pipeline.complete(std::move(dst_storage_sink)); - out_file->finalize(); + CompletedPipelineExecutor executor(pipeline); + executor.execute(); } } diff --git a/src/Storages/MergeTree/exportMTPartToStorage.h b/src/Storages/MergeTree/exportMTPartToStorage.h new file mode 100644 index 000000000000..28f08540ab03 --- /dev/null +++ b/src/Storages/MergeTree/exportMTPartToStorage.h @@ -0,0 +1,10 @@ +#pragma once + +#include + +namespace DB +{ + +void exportMTPartToStorage(const MergeTreeData & data, const MergeTreeData::DataPartPtr & data_part, SinkToStoragePtr dst_storage_sink, ContextPtr context); + +} diff --git a/src/Storages/ObjectStorage/S3/Configuration.cpp b/src/Storages/ObjectStorage/S3/Configuration.cpp index 629628c762fa..dace797ec153 100644 --- a/src/Storages/ObjectStorage/S3/Configuration.cpp +++ b/src/Storages/ObjectStorage/S3/Configuration.cpp @@ -117,7 +117,7 @@ StorageObjectStorage::QuerySettings StorageS3Configuration::getQuerySettings(con const auto & settings = context->getSettingsRef(); return StorageObjectStorage::QuerySettings{ .truncate_on_insert = settings[Setting::s3_truncate_on_insert], - .create_new_file_on_insert = settings[Setting::s3_create_new_file_on_insert], + . create_new_file_on_insert = settings[Setting::s3_create_new_file_on_insert], .schema_inference_use_cache = settings[Setting::schema_inference_use_cache_for_s3], .schema_inference_mode = settings[Setting::schema_inference_mode], .skip_empty_files = settings[Setting::s3_skip_empty_files], diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index b0284508562f..9cd041ef1416 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -366,6 +366,7 @@ SinkToStoragePtr StorageObjectStorage::write( configuration->getPath()); } + // todo arthur continue from here if (configuration->withGlobsIgnorePartitionWildcard()) { throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, diff --git a/src/Storages/PartitionCommands.cpp b/src/Storages/PartitionCommands.cpp index a75211f45121..813a7b128bd2 100644 --- a/src/Storages/PartitionCommands.cpp +++ b/src/Storages/PartitionCommands.cpp @@ -57,6 +57,9 @@ std::optional PartitionCommand::parse(const ASTAlterCommand * res.type = EXPORT_PART; res.partition = command_ast->partition->clone(); res.part = command_ast->part; + res.move_destination_type = PartitionCommand::MoveDestinationType::TABLE; + res.to_database = command_ast->to_database; + res.to_table = command_ast->to_table; return res; } if (command_ast->type == ASTAlterCommand::MOVE_PARTITION) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index a99be13ae24d..17d8595ef565 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -39,6 +39,8 @@ #include #include #include +#include +#include namespace DB @@ -2447,6 +2449,42 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const } } +/* + * For now, this function is meant to be used when exporting to different formats (i.e, the case where data needs to be re-encoded / serialized) + * For the cases where this is not necessary, there are way more optimal ways of doing that, such as hard links implemented by `movePartitionToTable` + * */ +void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command, ContextPtr query_context) +{ + String dest_database = query_context->resolveDatabase(command.to_database); + auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context); + + /// The target table and the source table are the same. + if (dest_storage->getStorageID() == this->getStorageID()) + return; + + bool async_insert = areAsynchronousInsertsEnabled(); + + auto query = std::make_shared(); + + String partition_id = getPartitionIDFromQuery(command.partition, getContext()); + auto src_parts = getVisibleDataPartsVectorInPartition(getContext(), partition_id); + + if (src_parts.empty()) + { + return; + } + + auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef()[Setting::lock_acquire_timeout]); + auto lock2 = dest_storage->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef()[Setting::lock_acquire_timeout]); + auto merges_blocker = stopMergesAndWait(); + + for (const auto & data_part : src_parts) + { + auto sink = dest_storage->write(query, getInMemoryMetadataPtr(), getContext(), async_insert); + exportMTPartToStorage(*this, data_part, sink, query_context); + } +} + ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type) { if (action_type == ActionLocks::PartsMerge) diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 7bc070b12b47..58204dccd648 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -245,6 +245,7 @@ class StorageMergeTree final : public MergeTreeData void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr context) override; void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) override; + void exportPartitionToTable(const PartitionCommand & command, ContextPtr query_context) override; bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; /// Update mutation entries after part mutation execution. May reset old /// errors if mutation was successful. Otherwise update last_failed* fields