diff --git a/docs/en/engines/table-engines/mergetree-family/part_export.md b/docs/en/engines/table-engines/mergetree-family/part_export.md index e0d67b1654cf..287e0a17f3af 100644 --- a/docs/en/engines/table-engines/mergetree-family/part_export.md +++ b/docs/en/engines/table-engines/mergetree-family/part_export.md @@ -21,6 +21,16 @@ SETTINGS allow_experimental_export_merge_tree_part = 1 [, setting_name = value, ...] ``` +## Syntax with table function + +```sql +ALTER TABLE [database.]table_name +EXPORT PART 'part_name' +TO TABLE FUNCTION s3(s3_conn, filename='table_function', partition_strategy...) +SETTINGS allow_experimental_export_merge_tree_part = 1 + [, setting_name = value, ...] +``` + ### Parameters - **`table_name`**: The source MergeTree table containing the part to export @@ -34,6 +44,8 @@ Source and destination tables must be 100% compatible: 1. **Identical schemas** - same columns, types, and order 2. **Matching partition keys** - partition expressions must be identical +In case a table function is used as the destination, the schema can be omitted and it will be inferred from the source table. + ## Settings ### `allow_experimental_export_merge_tree_part` (Required) @@ -95,6 +107,20 @@ ALTER TABLE mt_table EXPORT PART '2021_2_2_0' TO TABLE s3_table SETTINGS allow_experimental_export_merge_tree_part = 1; ``` +### Table function export + +```sql +-- Create source and destination tables +CREATE TABLE mt_table (id UInt64, year UInt16) +ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); + +-- Insert and export +INSERT INTO mt_table VALUES (1, 2020), (2, 2020), (3, 2021); + +ALTER TABLE mt_table EXPORT PART '2020_1_1_0' TO TABLE FUNCTION s3(s3_conn, filename='table_function', format=Parquet, partition_strategy='hive') PARTITION BY year +SETTINGS allow_experimental_export_merge_tree_part = 1; +``` + ## Monitoring ### Active Exports diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index e280cd6318f9..2685d27d28df 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -542,7 +542,9 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS case ASTAlterCommand::EXPORT_PART: { required_access.emplace_back(AccessType::ALTER_EXPORT_PART, database, table); - required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table); + /// For table functions, access control is handled by the table function itself + if (!command.to_table_function) + required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table); break; } case ASTAlterCommand::EXPORT_PARTITION: diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index 57d81c92898f..1bb3c413e81b 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -65,6 +65,10 @@ ASTPtr ASTAlterCommand::clone() const res->sql_security = res->children.emplace_back(sql_security->clone()).get(); if (rename_to) res->rename_to = res->children.emplace_back(rename_to->clone()).get(); + if (to_table_function) + res->to_table_function = res->children.emplace_back(to_table_function->clone()).get(); + if (partition_by_expr) + res->partition_by_expr = res->children.emplace_back(partition_by_expr->clone()).get(); return res; } @@ -367,11 +371,23 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett { case DataDestinationType::TABLE: ostr << "TABLE "; - if (!to_database.empty()) + if (to_table_function) + { + ostr << "FUNCTION "; + to_table_function->format(ostr, settings, state, frame); + if (partition_by_expr) + { + ostr << " PARTITION BY "; + partition_by_expr->format(ostr, settings, state, frame); + } + } + else { - ostr << backQuoteIfNeed(to_database) << "."; + if (!to_database.empty()) + ostr << backQuoteIfNeed(to_database) << "."; + + ostr << backQuoteIfNeed(to_table); } - ostr << backQuoteIfNeed(to_table); return; default: break; @@ -584,6 +600,8 @@ void ASTAlterCommand::forEachPointerToChild(std::function f) f(reinterpret_cast(&select)); f(reinterpret_cast(&sql_security)); f(reinterpret_cast(&rename_to)); + f(reinterpret_cast(&to_table_function)); + f(reinterpret_cast(&partition_by_expr)); } diff --git a/src/Parsers/ASTAlterQuery.h b/src/Parsers/ASTAlterQuery.h index 7683b2e11c3d..49800e833b22 100644 --- a/src/Parsers/ASTAlterQuery.h +++ b/src/Parsers/ASTAlterQuery.h @@ -220,6 +220,9 @@ class ASTAlterCommand : public IAST /// MOVE PARTITION partition TO TABLE db.table String to_database; String to_table; + /// EXPORT PART/PARTITION to TABLE FUNCTION (e.g., s3()) + IAST * to_table_function = nullptr; + IAST * partition_by_expr = nullptr; String snapshot_name; IAST * snapshot_desc; diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index eff14253b97f..a14ab699a919 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -93,6 +93,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_unfreeze(Keyword::UNFREEZE); ParserKeyword s_unlock_snapshot(Keyword::UNLOCK_SNAPSHOT); ParserKeyword s_partition(Keyword::PARTITION); + ParserKeyword s_partition_by(Keyword::PARTITION_BY); ParserKeyword s_first(Keyword::FIRST); ParserKeyword s_after(Keyword::AFTER); @@ -107,6 +108,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_to_volume(Keyword::TO_VOLUME); ParserKeyword s_to_table(Keyword::TO_TABLE); ParserKeyword s_to_shard(Keyword::TO_SHARD); + ParserKeyword s_function(Keyword::FUNCTION); ParserKeyword s_delete(Keyword::DELETE); ParserKeyword s_update(Keyword::UPDATE); @@ -176,6 +178,8 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ASTPtr command_rename_to; ASTPtr command_sql_security; ASTPtr command_snapshot_desc; + ASTPtr export_table_function; + ASTPtr export_table_function_partition_by_expr; if (with_round_bracket) { @@ -550,9 +554,27 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected return false; } - if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table)) - return false; - command->move_destination_type = DataDestinationType::TABLE; + if (s_function.ignore(pos, expected)) + { + ParserFunction table_function_parser(/*allow_function_parameters=*/true, /*is_table_function=*/true); + + if (!table_function_parser.parse(pos, export_table_function, expected)) + return false; + + if (s_partition_by.ignore(pos, expected)) + if (!parser_exp_elem.parse(pos, export_table_function_partition_by_expr, expected)) + return false; + + command->to_table_function = export_table_function.get(); + command->partition_by_expr = export_table_function_partition_by_expr.get(); + command->move_destination_type = DataDestinationType::TABLE; + } + else + { + if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table)) + return false; + command->move_destination_type = DataDestinationType::TABLE; + } } else if (s_export_partition.ignore(pos, expected)) { @@ -1061,6 +1083,10 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected command->rename_to = command->children.emplace_back(std::move(command_rename_to)).get(); if (command_snapshot_desc) command->snapshot_desc = command->children.emplace_back(std::move(command_snapshot_desc)).get(); + if (export_table_function) + command->to_table_function = command->children.emplace_back(std::move(export_table_function)).get(); + if (export_table_function_partition_by_expr) + command->partition_by_expr = command->children.emplace_back(std::move(export_table_function_partition_by_expr)).get(); return true; } diff --git a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp index ea8cc0cb536d..f6cd46092594 100644 --- a/src/Processors/Formats/Impl/Parquet/ReadManager.cpp +++ b/src/Processors/Formats/Impl/Parquet/ReadManager.cpp @@ -904,7 +904,10 @@ ReadManager::ReadResult ReadManager::read() { /// Pump the manual executor. lock.unlock(); - if (!parser_shared_resources->parsing_runner.runTaskInline()) + /// Note: the executor can be shared among multiple files, so we may execute someone + /// else's task, and someone else may execute our task. + /// Hence the thread_pool_was_idle check. + if (!parser_shared_resources->parsing_runner.runTaskInline() && thread_pool_was_idle) throw Exception(ErrorCodes::LOGICAL_ERROR, "Deadlock in Parquet::ReadManager (single-threaded)"); lock.lock(); } diff --git a/src/Storages/MergeTree/ExportPartTask.cpp b/src/Storages/MergeTree/ExportPartTask.cpp index af9217e34e40..ae96a2bc1616 100644 --- a/src/Storages/MergeTree/ExportPartTask.cpp +++ b/src/Storages/MergeTree/ExportPartTask.cpp @@ -110,19 +110,12 @@ bool ExportPartTask::executeStep() block_with_partition_values = manifest.data_part->minmax_idx->getBlock(storage); } - auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, local_context); - if (!destination_storage) - { - std::lock_guard inner_lock(storage.export_manifests_mutex); - - const auto destination_storage_id_name = manifest.destination_storage_id.getNameForLogs(); - storage.export_manifests.erase(manifest); - throw Exception(ErrorCodes::UNKNOWN_TABLE, "Failed to reconstruct destination storage: {}", destination_storage_id_name); - } + const auto & destination_storage = manifest.destination_storage_ptr; + const auto destination_storage_id = destination_storage->getStorageID(); auto exports_list_entry = storage.getContext()->getExportsList().insert( getStorageID(), - manifest.destination_storage_id, + destination_storage_id, manifest.data_part->getBytesOnDisk(), manifest.data_part->name, std::vector{}, diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 858aa7f8f9cb..a1d4d967d58f 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -6215,9 +6216,45 @@ void MergeTreeData::exportPartToTable(const PartitionCommand & command, ContextP const auto part_name = command.partition->as().value.safeGet(); - const auto database_name = query_context->resolveDatabase(command.to_database); + if (!command.to_table_function) + { + const auto database_name = query_context->resolveDatabase(command.to_database); + exportPartToTable(part_name, StorageID{database_name, command.to_table}, generateSnowflakeIDString(), query_context); + + return; + } + + auto table_function_ast = command.to_table_function; + auto table_function_ptr = TableFunctionFactory::instance().get(command.to_table_function, query_context); + + if (table_function_ptr->needStructureHint()) + { + const auto source_metadata_ptr = getInMemoryMetadataPtr(); + + /// Grab only the readable columns from the source metadata to skip ephemeral columns + const auto readable_columns = ColumnsDescription(source_metadata_ptr->getColumns().getReadable()); + table_function_ptr->setStructureHint(readable_columns); + } + + if (command.partition_by_expr) + { + table_function_ptr->setPartitionBy(command.partition_by_expr); + } + + auto dest_storage = table_function_ptr->execute( + table_function_ast, + query_context, + table_function_ptr->getName(), + /* cached_columns */ {}, + /* use_global_context */ false, + /* is_insert_query */ true); + + if (!dest_storage) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Failed to reconstruct destination storage"); + } - exportPartToTable(part_name, StorageID{database_name, command.to_table}, generateSnowflakeIDString(), query_context); + exportPartToTable(part_name, dest_storage, generateSnowflakeIDString(), query_context); } void MergeTreeData::exportPartToTable( @@ -6235,6 +6272,17 @@ void MergeTreeData::exportPartToTable( throw Exception(ErrorCodes::BAD_ARGUMENTS, "Exporting to the same table is not allowed"); } + exportPartToTable(part_name, dest_storage, transaction_id, query_context, allow_outdated_parts, completion_callback); +} + +void MergeTreeData::exportPartToTable( + const std::string & part_name, + const StoragePtr & dest_storage, + const String & transaction_id, + ContextPtr query_context, + bool allow_outdated_parts, + std::function completion_callback) +{ if (!dest_storage->supportsImport()) throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Destination storage {} does not support MergeTree parts or uses unsupported partitioning", dest_storage->getName()); @@ -6304,7 +6352,7 @@ void MergeTreeData::exportPartToTable( { const auto format_settings = getFormatSettings(query_context); MergeTreePartExportManifest manifest( - dest_storage->getStorageID(), + dest_storage, part, transaction_id, query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, @@ -6316,8 +6364,7 @@ void MergeTreeData::exportPartToTable( if (!export_manifests.emplace(std::move(manifest)).second) { - throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported to table '{}'", - part_name, dest_storage->getStorageID().getFullTableName()); + throw Exception(ErrorCodes::ABORTED, "Data part '{}' is already being exported", part_name); } } @@ -8750,8 +8797,9 @@ std::vector MergeTreeData::getExportsStatus() const status.source_database = source_database; status.source_table = source_table; - status.destination_database = manifest.destination_storage_id.database_name; - status.destination_table = manifest.destination_storage_id.table_name; + const auto destination_storage_id = manifest.destination_storage_ptr->getStorageID(); + status.destination_database = destination_storage_id.database_name; + status.destination_table = destination_storage_id.table_name; status.create_time = manifest.create_time; status.part_name = manifest.data_part->name; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 732416ab7fd6..d56277d176a9 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -984,6 +984,14 @@ class MergeTreeData : public IStorage, public WithMutableContext void exportPartToTable(const PartitionCommand & command, ContextPtr query_context); + void exportPartToTable( + const std::string & part_name, + const StoragePtr & destination_storage, + const String & transaction_id, + ContextPtr query_context, + bool allow_outdated_parts = false, + std::function completion_callback = {}); + void exportPartToTable( const std::string & part_name, const StorageID & destination_storage_id, diff --git a/src/Storages/MergeTree/MergeTreePartExportManifest.h b/src/Storages/MergeTree/MergeTreePartExportManifest.h index db6626d22e0a..6cc5129b5efd 100644 --- a/src/Storages/MergeTree/MergeTreePartExportManifest.h +++ b/src/Storages/MergeTree/MergeTreePartExportManifest.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB { @@ -43,14 +44,14 @@ struct MergeTreePartExportManifest }; MergeTreePartExportManifest( - const StorageID & destination_storage_id_, + const StoragePtr destination_storage_ptr_, const DataPartPtr & data_part_, const String & transaction_id_, FileAlreadyExistsPolicy file_already_exists_policy_, const Settings & settings_, const StorageMetadataPtr & metadata_snapshot_, std::function completion_callback_ = {}) - : destination_storage_id(destination_storage_id_), + : destination_storage_ptr(destination_storage_ptr_), data_part(data_part_), transaction_id(transaction_id_), file_already_exists_policy(file_already_exists_policy_), @@ -59,7 +60,7 @@ struct MergeTreePartExportManifest completion_callback(completion_callback_), create_time(time(nullptr)) {} - StorageID destination_storage_id; + StoragePtr destination_storage_ptr; DataPartPtr data_part; /// Used for killing the export. String transaction_id; @@ -78,20 +79,12 @@ struct MergeTreePartExportManifest bool operator<(const MergeTreePartExportManifest & rhs) const { - // Lexicographic comparison: first compare destination storage, then part name - auto lhs_storage = destination_storage_id.getQualifiedName(); - auto rhs_storage = rhs.destination_storage_id.getQualifiedName(); - - if (lhs_storage != rhs_storage) - return lhs_storage < rhs_storage; - return data_part->name < rhs.data_part->name; } bool operator==(const MergeTreePartExportManifest & rhs) const { - return destination_storage_id.getQualifiedName() == rhs.destination_storage_id.getQualifiedName() - && data_part->name == rhs.data_part->name; + return data_part->name == rhs.data_part->name; } }; diff --git a/src/Storages/PartitionCommands.cpp b/src/Storages/PartitionCommands.cpp index b8ef557604bc..d0e7b4895687 100644 --- a/src/Storages/PartitionCommands.cpp +++ b/src/Storages/PartitionCommands.cpp @@ -138,6 +138,12 @@ std::optional PartitionCommand::parse(const ASTAlterCommand * res.part = command_ast->part; res.to_database = command_ast->to_database; res.to_table = command_ast->to_table; + if (command_ast->to_table_function) + { + res.to_table_function = command_ast->to_table_function->ptr(); + if (command_ast->partition_by_expr) + res.partition_by_expr = command_ast->partition_by_expr->clone(); + } return res; } if (command_ast->type == ASTAlterCommand::EXPORT_PARTITION) @@ -147,6 +153,12 @@ std::optional PartitionCommand::parse(const ASTAlterCommand * res.partition = command_ast->partition->clone(); res.to_database = command_ast->to_database; res.to_table = command_ast->to_table; + if (command_ast->to_table_function) + { + res.to_table_function = command_ast->to_table_function->ptr(); + if (command_ast->partition_by_expr) + res.partition_by_expr = command_ast->partition_by_expr->clone(); + } return res; } return {}; diff --git a/src/Storages/PartitionCommands.h b/src/Storages/PartitionCommands.h index e3f36d0e7c1f..fe5df1bdc07a 100644 --- a/src/Storages/PartitionCommands.h +++ b/src/Storages/PartitionCommands.h @@ -52,10 +52,14 @@ struct PartitionCommand String from_table; bool replace = true; - /// For MOVE PARTITION + /// For MOVE PARTITION and EXPORT PART and EXPORT PARTITION String to_database; String to_table; + /// For EXPORT PART and EXPORT PARTITION with table functions + ASTPtr to_table_function; + ASTPtr partition_by_expr; + /// For FETCH PARTITION - path in ZK to the shard, from which to download the partition. String from_zookeeper_path; diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference index 98b2247bc1fb..883191745d91 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.reference @@ -42,6 +42,13 @@ ---- Count rows in big_table and big_destination_max_rows 4194304 4194304 +---- Table function with schema inheritance (no schema specified) +---- Data should be exported with inherited schema +100 test1 2022 +101 test2 2022 +---- Table function with explicit compatible schema +---- Data should be exported with explicit schema +102 test3 2023 ---- Test ALIAS columns export ---- Verify ALIAS column data in source table (arr_1 computed from arr[1]) 1 [1,2,3] 1 @@ -70,6 +77,10 @@ ---- Verify mixed columns exported to S3 (should match source) 1 5 10 15 TEST 1 10 20 30 PROD +---- Test Export to Table Function with mixed columns +---- Verify mixed columns exported to S3 +1 5 10 15 TEST +1 10 20 30 PROD ---- Test Complex Expressions in computed columns ---- Verify complex expressions in source table 1 alice ALICE alice-1 diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh index ea478d1b3df4..21e8fed0d2d9 100755 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage.sh @@ -18,6 +18,8 @@ mt_table_roundtrip="mt_table_roundtrip_${RANDOM}" big_table="big_table_${RANDOM}" big_destination_max_bytes="big_destination_max_bytes_${RANDOM}" big_destination_max_rows="big_destination_max_rows_${RANDOM}" +tf_schema_inherit="tf_schema_inherit_${RANDOM}" +tf_schema_explicit="tf_schema_explicit_${RANDOM}" mt_table_tf="mt_table_tf_${RANDOM}" mt_alias="mt_alias_${RANDOM}" mt_materialized="mt_materialized_${RANDOM}" @@ -29,6 +31,7 @@ mt_complex_expr="mt_complex_expr_${RANDOM}" s3_complex_expr_export="s3_complex_expr_export_${RANDOM}" mt_ephemeral="mt_ephemeral_${RANDOM}" s3_ephemeral_export="s3_ephemeral_export_${RANDOM}" +s3_mixed_export_table_function="s3_mixed_export_table_function_${RANDOM}" query() { $CLICKHOUSE_CLIENT --query "$1" @@ -44,6 +47,8 @@ echo "---- Export 2020_1_1_0 and 2021_2_2_0" query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" query "ALTER TABLE $mt_table EXPORT PART '2021_2_2_0' TO TABLE $s3_table SETTINGS allow_experimental_export_merge_tree_part = 1" +sleep 3 + echo "---- Both data parts should appear" query "SELECT * FROM $s3_table ORDER BY id" @@ -98,15 +103,18 @@ query "CREATE TABLE $big_destination_max_rows(id UInt64, data String, year UInt1 # 4194304 is a number that came up during multiple iterations, it does not really mean anything (aside from the fact that the below numbers depend on it) query "INSERT INTO $big_table SELECT number AS id, repeat('x', 100) AS data, 2025 AS year FROM numbers(4194304)" +query "INSERT INTO $big_table SELECT number AS id, repeat('x', 100) AS data, 2026 AS year FROM numbers(4194304)" + # make sure we have only one part query "OPTIMIZE TABLE $big_table FINAL" -big_part=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$big_table' AND partition_id = '2025' AND active = 1 ORDER BY name LIMIT 1" | tr -d '\n') +big_part_max_bytes=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$big_table' AND partition_id = '2025' AND active = 1 ORDER BY name LIMIT 1" | tr -d '\n') +big_part_max_rows=$(query "SELECT name FROM system.parts WHERE database = currentDatabase() AND table = '$big_table' AND partition_id = '2026' AND active = 1 ORDER BY name LIMIT 1" | tr -d '\n') # this should generate ~4 files -query "ALTER TABLE $big_table EXPORT PART '$big_part' TO TABLE $big_destination_max_bytes SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_max_bytes_per_file=3500000, output_format_parquet_row_group_size_bytes=1000000" +query "ALTER TABLE $big_table EXPORT PART '$big_part_max_bytes' TO TABLE $big_destination_max_bytes SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_max_bytes_per_file=3500000, output_format_parquet_row_group_size_bytes=1000000" # export_merge_tree_part_max_rows_per_file = 1048576 (which is 4194304/4) to generate 4 files -query "ALTER TABLE $big_table EXPORT PART '$big_part' TO TABLE $big_destination_max_rows SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_max_rows_per_file=1048576" +query "ALTER TABLE $big_table EXPORT PART '$big_part_max_rows' TO TABLE $big_destination_max_rows SETTINGS allow_experimental_export_merge_tree_part = 1, export_merge_tree_part_max_rows_per_file=1048576" # sleeping a little longer because it will write multiple files, trying not be flaky sleep 20 @@ -115,16 +123,35 @@ echo "---- Count files in big_destination_max_bytes, should be 5 (4 parquet, 1 c query "SELECT count(_file) FROM s3(s3_conn, filename='$big_destination_max_bytes/**', format='One')" echo "---- Count rows in big_table and big_destination_max_bytes" -query "SELECT COUNT() from $big_table" +query "SELECT COUNT() from $big_table WHERE year = 2025" query "SELECT COUNT() from $big_destination_max_bytes" echo "---- Count files in big_destination_max_rows, should be 5 (4 parquet, 1 commit)" query "SELECT count(_file) FROM s3(s3_conn, filename='$big_destination_max_rows/**', format='One')" echo "---- Count rows in big_table and big_destination_max_rows" -query "SELECT COUNT() from $big_table" +query "SELECT COUNT() from $big_table WHERE year = 2026" query "SELECT COUNT() from $big_destination_max_rows" +echo "---- Table function with schema inheritance (no schema specified)" +query "CREATE TABLE $mt_table_tf (id UInt64, value String, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()" +query "INSERT INTO $mt_table_tf VALUES (100, 'test1', 2022), (101, 'test2', 2022), (102, 'test3', 2023)" + +query "ALTER TABLE $mt_table_tf EXPORT PART '2022_1_1_0' TO TABLE FUNCTION s3(s3_conn, filename='$tf_schema_inherit', format='Parquet', partition_strategy='hive') PARTITION BY year SETTINGS allow_experimental_export_merge_tree_part = 1" + +sleep 3 + +echo "---- Data should be exported with inherited schema" +query "SELECT * FROM s3(s3_conn, filename='$tf_schema_inherit/**.parquet') ORDER BY id" + +echo "---- Table function with explicit compatible schema" +query "ALTER TABLE $mt_table_tf EXPORT PART '2023_2_2_0' TO TABLE FUNCTION s3(s3_conn, filename='$tf_schema_explicit', format='Parquet', structure='id UInt64, value String, year UInt16', partition_strategy='hive') PARTITION BY year SETTINGS allow_experimental_export_merge_tree_part = 1" + +sleep 3 + +echo "---- Data should be exported with explicit schema" +query "SELECT * FROM s3(s3_conn, filename='$tf_schema_explicit/**.parquet') ORDER BY id" + echo "---- Test ALIAS columns export" query "CREATE TABLE $mt_alias (a UInt32, arr Array(UInt64), arr_1 UInt64 ALIAS arr[1]) ENGINE = MergeTree() PARTITION BY a ORDER BY (a, arr[1]) SETTINGS index_granularity = 1" query "CREATE TABLE $s3_alias_export (a UInt32, arr Array(UInt64), arr_1 UInt64) ENGINE = S3(s3_conn, filename='$s3_alias_export', format=Parquet, partition_strategy='hive') PARTITION BY a" @@ -219,6 +246,15 @@ query "SELECT id, value, doubled, tripled, tag FROM $mt_mixed ORDER BY value" echo "---- Verify mixed columns exported to S3 (should match source)" query "SELECT id, value, doubled, tripled, tag FROM $s3_mixed_export ORDER BY value" +echo "---- Test Export to Table Function with mixed columns" + +query "ALTER TABLE $mt_mixed EXPORT PART '$mixed_part' TO TABLE FUNCTION s3(s3_conn, filename='$s3_mixed_export_table_function', format=Parquet, partition_strategy='hive') PARTITION BY id SETTINGS allow_experimental_export_merge_tree_part = 1" + +sleep 3 + +echo "---- Verify mixed columns exported to S3" +query "SELECT * FROM s3(s3_conn, filename='$s3_mixed_export_table_function/**.parquet', format=Parquet) ORDER BY value" + echo "---- Test Complex Expressions in computed columns" query "CREATE TABLE $mt_complex_expr ( id UInt32, @@ -248,4 +284,4 @@ query "SELECT id, name, upper_name, concat_result FROM $mt_complex_expr ORDER BY echo "---- Verify complex expressions exported to S3 (should match source)" query "SELECT id, name, upper_name, concat_result FROM $s3_complex_expr_export ORDER BY name" -query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function, $big_table, $big_destination_max_bytes, $big_destination_max_rows, $mt_alias, $mt_materialized, $s3_alias_export, $s3_materialized_export, $mt_ephemeral, $s3_ephemeral_export, $mt_mixed, $s3_mixed_export, $mt_complex_expr, $s3_complex_expr_export" +query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $mt_table_tf, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function, $big_table, $big_destination_max_bytes, $big_destination_max_rows, $mt_alias, $mt_materialized, $s3_alias_export, $s3_materialized_export, $mt_ephemeral, $s3_ephemeral_export, $mt_mixed, $s3_mixed_export, $mt_complex_expr, $s3_complex_expr_export" diff --git a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql index f92f6607646c..4b235b3b8372 100644 --- a/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql +++ b/tests/queries/0_stateless/03572_export_merge_tree_part_to_object_storage_simple.sql @@ -19,6 +19,12 @@ CREATE TABLE 03572_invalid_schema_table (id UInt64, year UInt16) ENGINE = S3(s3_ ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE 03572_invalid_schema_table SETTINGS allow_experimental_export_merge_tree_part = 1; -- {serverError NOT_IMPLEMENTED} +-- Not a table function, should throw +ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE FUNCTION extractKeyValuePairs('name:ronaldo'); -- {serverError UNKNOWN_FUNCTION} + +-- It is a table function, but the engine does not support exports/imports, should throw +ALTER TABLE 03572_mt_table EXPORT PART '2020_1_1_0' TO TABLE FUNCTION url('a.parquet'); -- {serverError NOT_IMPLEMENTED} + -- Test that destination table can not have a column that matches the source ephemeral CREATE TABLE 03572_ephemeral_mt_table (id UInt64, year UInt16, name String EPHEMERAL) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple(); diff --git a/tests/queries/0_stateless/03604_parquet_many_files.reference b/tests/queries/0_stateless/03604_parquet_many_files.reference new file mode 100644 index 000000000000..15c4c9a2c654 --- /dev/null +++ b/tests/queries/0_stateless/03604_parquet_many_files.reference @@ -0,0 +1 @@ +4500 diff --git a/tests/queries/0_stateless/03604_parquet_many_files.sh b/tests/queries/0_stateless/03604_parquet_many_files.sh new file mode 100755 index 000000000000..86321407d164 --- /dev/null +++ b/tests/queries/0_stateless/03604_parquet_many_files.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +${CLICKHOUSE_LOCAL} -q "insert into function file('${CLICKHOUSE_TMP}/t0.parquet') select * from numbers(10)" +for i in {1..99} +do + cp "${CLICKHOUSE_TMP}/t0.parquet" "${CLICKHOUSE_TMP}/t${i}.parquet" +done + +${CLICKHOUSE_LOCAL} -q "select sum(number) from file('${CLICKHOUSE_TMP}/t{0..99}.parquet') settings input_format_parquet_preserve_order=1, input_format_parquet_use_native_reader_v3=1" + +rm "${CLICKHOUSE_TMP}"/t{0..99}.parquet \ No newline at end of file