diff --git a/docs/en/engines/table-engines/special/tiered-distributed.md b/docs/en/engines/table-engines/special/hybrid.md similarity index 56% rename from docs/en/engines/table-engines/special/tiered-distributed.md rename to docs/en/engines/table-engines/special/hybrid.md index 95a986ff48de..12df6cd859b8 100644 --- a/docs/en/engines/table-engines/special/tiered-distributed.md +++ b/docs/en/engines/table-engines/special/hybrid.md @@ -1,6 +1,6 @@ --- -description: 'Hybrid unions multiple data sources behind per-layer predicates so queries behave like a single table while data is migrated or tiered.' -slug: /engines/table-engines/special/tiered-distributed +description: 'Hybrid unions multiple data sources behind per-segment predicates so queries behave like a single table while data is migrated or tiered.' +slug: /engines/table-engines/special/hybrid title: 'Hybrid Table Engine' sidebar_label: 'Hybrid' sidebar_position: 11 @@ -9,7 +9,7 @@ sidebar_position: 11 # Hybrid table engine `Hybrid` builds on top of the [Distributed](./distributed.md) table engine. It lets you expose several data sources as one logical table and assign every source its own predicate. -The engine rewrites incoming queries so that each layer receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`, +The engine rewrites incoming queries so that each segment receives the original query plus its predicate. This keeps all of the Distributed optimisations (remote aggregation, `skip_unused_shards`, global JOIN pushdown, and so on) while you duplicate or migrate data across clusters, storage types, or formats. It keeps the same execution pipeline as `engine=Distributed` but can read from multiple underlying sources simultaneously—similar to `engine=Merge`—while still pushing logic down to each source. @@ -20,7 +20,21 @@ Typical use cases include: - Tiered storage, for example fresh data on a local cluster and historical data in S3. - Gradual roll-outs where only a subset of rows should be served from a new backend. -By giving mutually exclusive predicates to the layers (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source. +By giving mutually exclusive predicates to the segments (for example, `date < watermark` and `date >= watermark`), you ensure that each row is read from exactly one source. + +## Enable the engine + +The Hybrid engine is experimental. Enable it per session (or in the user profile) before creating tables: + +```sql +SET allow_experimental_hybrid_table = 1; +``` + +### Automatic Type Alignment + +Hybrid segments can evolve independently, so the same logical column may use different physical types. With the experimental `hybrid_table_auto_cast_columns = 1` **(enabled by default and requires `allow_experimental_analyzer = 1`)**, the engine inserts the necessary `CAST` operations into each rewritten query so every shard receives the schema defined by the Hybrid table. You can opt out by setting the flag to `0` if it causes issues. + +Segment schemas are cached when you create or attach a Hybrid table. If you alter a segment later (for example change a column type), refresh the Hybrid table (detach/attach or recreate it) so the cached headers stay in sync with the new schema; otherwise the auto-cast feature may miss the change and queries can still fail with header/type errors. ## Engine definition @@ -39,14 +53,14 @@ You must pass at least two arguments – the first table function and its predic ### Arguments and behaviour - `table_function_n` must be a valid table function (for example `remote`, `remoteSecure`, `cluster`, `clusterAllReplicas`, `s3Cluster`) or a fully qualified table name (`database.table`). The first argument must be a table function—such as `remote` or `cluster`—because it instantiates the underlying `Distributed` storage. -- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the layer's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical. -- The query planner picks the same processing stage for every layer as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way. +- `predicate_n` must be an expression that can be evaluated on the table columns. The engine adds it to the segment's query with an additional `AND`, so expressions like `event_date >= '2025-09-01'` or `id BETWEEN 10 AND 15` are typical. +- The query planner picks the same processing stage for every segment as it does for the base `Distributed` plan, so remote aggregation, ORDER BY pushdown, `skip_unused_shards`, and the legacy/analyzer execution modes behave the same way. - `INSERT` statements are forwarded to the first table function only. If you need multi-destination writes, use explicit `INSERT` statements into the respective sources. -- Align schemas across the layers. ClickHouse builds a common header; if the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas. +- Align schemas across the segments. ClickHouse builds a common header and rejects creation if any segment misses a column defined in the Hybrid schema. If the physical types differ you may need to add casts on one side or in the query, just as you would when reading from heterogeneous replicas. ## Example: local cluster plus S3 historical tier -The following commands illustrate a two-layer layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files. +The following commands illustrate a two-segment layout. Hot data stays on a local ClickHouse cluster, while historical rows come from public S3 Parquet files. ```sql -- Local MergeTree table that keeps current data @@ -79,7 +93,7 @@ CREATE OR REPLACE TABLE btc_blocks ENGINE = Hybrid( s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN), date < '2025-09-01' ) AS btc_blocks_local; --- Writes target the first (remote) layer +-- Writes target the first (remote) segment INSERT INTO btc_blocks SELECT * FROM s3('s3://aws-public-blockchain/v1.0/btc/blocks/**.parquet', NOSIGN) @@ -103,4 +117,4 @@ GROUP BY date ORDER BY date; ``` -Because the predicates are applied inside every layer, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above. +Because the predicates are applied inside every segment, queries such as `ORDER BY`, `GROUP BY`, `LIMIT`, `JOIN`, and `EXPLAIN` behave as if you were reading from a single `Distributed` table. When sources expose different physical types (for example `FixedString(64)` versus `String` in Parquet), add explicit casts during ingestion or in the query, as shown above. diff --git a/src/Analyzer/Passes/HybridCastsPass.cpp b/src/Analyzer/Passes/HybridCastsPass.cpp new file mode 100644 index 000000000000..f40e7664df50 --- /dev/null +++ b/src/Analyzer/Passes/HybridCastsPass.cpp @@ -0,0 +1,150 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +namespace DB +{ + +namespace Setting +{ + extern const SettingsBool hybrid_table_auto_cast_columns; +} + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +namespace +{ + +/// Collect Hybrid table expressions that require casts to normalize headers across segments. +/// +/// Hybrid is currently exposed only as an engine (TableNode). If it ever gets a table function +/// wrapper, this visitor must also look at TableFunctionNode and unwrap to the underlying +/// StorageDistributed so cached casts can be picked up there as well. +class HybridCastTablesCollector : public InDepthQueryTreeVisitor +{ +public: + explicit HybridCastTablesCollector(std::unordered_map & cast_map_) + : cast_map(cast_map_) + {} + + static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr &) { return true; } + + void visitImpl(QueryTreeNodePtr & node) + { + const auto * table = node->as(); + if (!table) + return; + + const auto * storage = table->getStorage().get(); + if (const auto * distributed = typeid_cast(storage)) + { + ColumnsDescription to_cast = distributed->getColumnsToCast(); + if (!to_cast.empty()) + cast_map.emplace(node.get(), std::move(to_cast)); // repeated table_expression can overwrite + } + } + +private: + std::unordered_map & cast_map; +}; + +// Visitor replaces all usages of the column with CAST(column, type) in the query tree. +class HybridCastVisitor : public InDepthQueryTreeVisitor +{ +public: + HybridCastVisitor( + const std::unordered_map & cast_map_, + ContextPtr context_) + : cast_map(cast_map_) + , context(std::move(context_)) + {} + + bool shouldTraverseTopToBottom() const { return false; } + + static bool needChildVisit(QueryTreeNodePtr &, QueryTreeNodePtr & child) + { + /// Traverse all child nodes so casts also apply inside subqueries and UNION branches. + (void)child; + return true; + } + + void visitImpl(QueryTreeNodePtr & node) + { + auto * column_node = node->as(); + if (!column_node) + return; + + auto column_source = column_node->getColumnSourceOrNull(); + if (!column_source) + return; + + auto it = cast_map.find(column_source.get()); + if (it == cast_map.end()) + return; + + const auto & column_name = column_node->getColumnName(); + auto expected_column_opt = it->second.tryGetPhysical(column_name); + if (!expected_column_opt) + return; + + auto column_clone = std::static_pointer_cast(column_node->clone()); + + auto cast_node = buildCastFunction(column_clone, expected_column_opt->type, context); + const auto & alias = node->getAlias(); + if (!alias.empty()) + cast_node->setAlias(alias); + else + cast_node->setAlias(expected_column_opt->name); + + node = cast_node; + } + +private: + const std::unordered_map & cast_map; + ContextPtr context; +}; + + +} // namespace + +void HybridCastsPass::run(QueryTreeNodePtr & query_tree_node, ContextPtr context) +{ + const auto & settings = context->getSettingsRef(); + if (!settings[Setting::hybrid_table_auto_cast_columns]) + return; + + auto * query = query_tree_node->as(); + if (!query) + return; + + std::unordered_map cast_map; + HybridCastTablesCollector collector(cast_map); + collector.visit(query_tree_node); + if (cast_map.empty()) + return; + + HybridCastVisitor visitor(cast_map, context); + visitor.visit(query_tree_node); +} + +} diff --git a/src/Analyzer/Passes/HybridCastsPass.h b/src/Analyzer/Passes/HybridCastsPass.h new file mode 100644 index 000000000000..6b3159d6e925 --- /dev/null +++ b/src/Analyzer/Passes/HybridCastsPass.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include + +namespace DB +{ + +/// Adds CASTs for Hybrid segments when physical types differ from the Hybrid schema +/// +/// It normalizes headers coming from different segments when table structure in some segments +/// differs from the Hybrid table definition. For example column X is UInt32 in the Hybrid table, +/// but Int64 in an additional segment. +/// +/// Without these casts ConvertingActions may fail to reconcile mismatched headers when casts are impossible +/// (e.g. AggregateFunction states carry hashed data tied to their argument type and cannot be recast), for example: +/// "Conversion from AggregateFunction(uniq, Decimal(38, 0)) to AggregateFunction(uniq, UInt64) is not supported" +/// (CANNOT_CONVERT_TYPE). +/// +/// Per-segment casts are not reliable because WithMergeState strips aliases, so merged pipelines +/// from different segments would return different headers (with or without CAST), leading to errors +/// like "Cannot find column `max(value)` in source stream, there are only columns: [max(_CAST(value, 'UInt64'))]" +/// (THERE_IS_NO_COLUMN). +class HybridCastsPass : public IQueryTreePass +{ +public: + String getName() override { return "HybridCastsPass"; } + String getDescription() override { return "Inject casts for Hybrid columns to match schema types"; } + void run(QueryTreeNodePtr & query_tree_node, ContextPtr context) override; +}; + +} diff --git a/src/Analyzer/QueryTreePassManager.cpp b/src/Analyzer/QueryTreePassManager.cpp index a818ad348020..3f94444cdc53 100644 --- a/src/Analyzer/QueryTreePassManager.cpp +++ b/src/Analyzer/QueryTreePassManager.cpp @@ -48,6 +48,7 @@ #include #include #include +#include #include namespace DB @@ -309,6 +310,8 @@ void addQueryTreePasses(QueryTreePassManager & manager, bool only_analyze) manager.addPass(std::make_unique()); manager.addPass(std::make_unique()); + + manager.addPass(std::make_unique()); } } diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 5a7530b752b8..30728305319c 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6926,6 +6926,12 @@ Allows creation of tables with the [TimeSeries](../../engines/table-engines/inte - 0 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is disabled. - 1 — the [TimeSeries](../../engines/table-engines/integrations/time-series.md) table engine is enabled. )", EXPERIMENTAL) \ + DECLARE(Bool, allow_experimental_hybrid_table, false, R"( +Allows creation of tables with the [Hybrid](../../engines/table-engines/special/hybrid.md) table engine. +)", EXPERIMENTAL) \ + DECLARE(Bool, hybrid_table_auto_cast_columns, true, R"( +Automatically cast columns to the schema defined in Hybrid tables when remote segments expose different physical types. Works only with analyzer. Enabled by default, does nothing if (experimental) Hybrid tables are disabled; disable it if it causes issues. Segment schemas are cached when the Hybrid table is created or attached; if a segment schema changes later, detach/attach or recreate the Hybrid table so the cached headers stay in sync. +)", 0) \ DECLARE(Bool, allow_experimental_codecs, false, R"( If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing). )", EXPERIMENTAL) \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index b8f5f9bea73c..3e3f5e3f7608 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -53,7 +53,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"export_merge_tree_partition_max_retries", 3, 3, "New setting."}, {"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."}, {"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."}, - {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."} + {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."}, + {"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."}, + {"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."} }); addSettingsChanges(settings_changes_history, "25.8", { diff --git a/src/Databases/enableAllExperimentalSettings.cpp b/src/Databases/enableAllExperimentalSettings.cpp index 705a27c0905e..215ed5ad9248 100644 --- a/src/Databases/enableAllExperimentalSettings.cpp +++ b/src/Databases/enableAllExperimentalSettings.cpp @@ -61,6 +61,7 @@ void enableAllExperimentalSettings(ContextMutablePtr context) context->setSetting("allow_experimental_ytsaurus_table_engine", 1); context->setSetting("allow_experimental_ytsaurus_dictionary_source", 1); context->setSetting("allow_experimental_time_series_aggregate_functions", 1); + context->setSetting("allow_experimental_hybrid_table", 1); context->setSetting("allow_experimental_lightweight_update", 1); context->setSetting("allow_experimental_insert_into_iceberg", 1); context->setSetting("allow_experimental_iceberg_compaction", 1); diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index a3ffad98a47b..d518887b3f6d 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -104,6 +104,7 @@ #include #include +#include #include #include @@ -117,9 +118,9 @@ #include #include #include - #include #include +#include namespace fs = std::filesystem; @@ -169,6 +170,8 @@ void replaceCurrentDatabaseFunction(ASTPtr & ast, const ContextPtr & context) for (auto & child : ast->children) replaceCurrentDatabaseFunction(child, context); } + + } namespace Setting @@ -201,6 +204,7 @@ namespace Setting extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas; extern const SettingsBool prefer_global_in_and_join; extern const SettingsBool enable_global_with_statement; + extern const SettingsBool allow_experimental_hybrid_table; } namespace DistributedSetting @@ -234,6 +238,7 @@ namespace ErrorCodes extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES; extern const int ARGUMENT_OUT_OF_BOUND; extern const int TOO_LARGE_DISTRIBUTED_DEPTH; + extern const int SUPPORT_IS_DISABLED; } namespace ActionLocks @@ -548,8 +553,8 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( return QueryProcessingStage::WithMergeableState; // TODO: check logic - if (!additional_table_functions.empty()) - nodes += additional_table_functions.size(); + if (!segments.empty()) + nodes += segments.size(); /// If there is only one node, the query can be fully processed by the /// shard, initiator will work as a proxy only. @@ -593,7 +598,7 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage( bool StorageDistributed::isShardingKeySuitsQueryTreeNodeExpression( const QueryTreeNodePtr & expr, const SelectQueryInfo & query_info) const { - if (!additional_table_functions.empty()) + if (!segments.empty()) return false; ColumnsWithTypeAndName empty_input_columns; @@ -636,7 +641,7 @@ bool StorageDistributed::isShardingKeySuitsQueryTreeNodeExpression( return allOutputsDependsOnlyOnAllowedNodes(sharding_key_dag, irreducibe_nodes, matches); } -// TODO: support additional table functions +// TODO: support additional segments std::optional StorageDistributed::getOptimizedQueryProcessingStageAnalyzer(const SelectQueryInfo & query_info, const Settings & settings) const { bool optimize_sharding_key_aggregation = settings[Setting::optimize_skip_unused_shards] && settings[Setting::optimize_distributed_group_by_sharding_key] @@ -695,7 +700,7 @@ std::optional StorageDistributed::getOptimizedQueryP return QueryProcessingStage::Complete; } -// TODO: support additional table functions +// TODO: support additional segments std::optional StorageDistributed::getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const { bool optimize_sharding_key_aggregation = settings[Setting::optimize_skip_unused_shards] && settings[Setting::optimize_distributed_group_by_sharding_key] @@ -805,11 +810,11 @@ static bool requiresObjectColumns(const ColumnsDescription & all_columns, ASTPtr StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const { - /// TODO: support additional table functions + /// TODO: support additional segments return getStorageSnapshotForQuery(metadata_snapshot, nullptr, query_context); } -/// TODO: support additional table functions +/// TODO: support additional segments StorageSnapshotPtr StorageDistributed::getStorageSnapshotForQuery( const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query, ContextPtr /*query_context*/) const { @@ -1013,7 +1018,6 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, replacement_table_expression->setAlias(query_info.table_expression->getAlias()); - QueryTreeNodePtr filter; if (additional_filter) @@ -1025,7 +1029,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info, QueryAnalysisPass(replacement_table_expression).run(filter, context); } - auto query_tree_to_modify = query_info.query_tree->cloneAndReplace(query_info.table_expression, std::move(replacement_table_expression)); + auto query_tree_to_modify = query_info.query_tree->cloneAndReplace(query_info.table_expression, replacement_table_expression); // Apply additional filter if provided if (filter) @@ -1076,6 +1080,38 @@ void StorageDistributed::read( std::vector additional_query_infos; const auto & settings = local_context->getSettingsRef(); + auto metadata_ptr = getInMemoryMetadataPtr(); + + auto describe_segment_target = [&](const HybridSegment & segment) -> String + { + if (segment.storage_id) + return segment.storage_id->getNameForLogs(); + if (segment.table_function_ast) + return segment.table_function_ast->formatForLogging(); + chassert(false, "Hybrid segment is missing both storage_id and table_function_ast"); + return String{""}; + }; + + auto describe_base_target = [&]() -> String + { + if (remote_table_function_ptr) + return remote_table_function_ptr->formatForLogging(); + if (!remote_database.empty()) + return remote_database + "." + remote_table; + return remote_table; + }; + + String base_target = describe_base_target(); + + const bool log_hybrid_query_rewrites = (!segments.empty() || base_segment_predicate); + + auto log_rewritten_query = [&](const String & target, const ASTPtr & ast) + { + if (!log_hybrid_query_rewrites || !ast) + return; + + LOG_TRACE(log, "rewriteSelectQuery (target: {}) -> {}", target, ast->formatForLogging()); + }; if (settings[Setting::allow_experimental_analyzer]) { @@ -1087,7 +1123,7 @@ void StorageDistributed::read( query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, remote_storage_id, remote_table_function_ptr, - additional_filter); + base_segment_predicate); Block block = *InterpreterSelectQueryAnalyzer::getSampleBlock(query_tree_distributed, local_context, SelectQueryOptions(processed_stage).analyze()); /** For distributed tables we do not need constants in header, since we don't send them to remote servers. * Moreover, constants can break some functions like `hostName` that are constants only for local queries. @@ -1100,29 +1136,31 @@ void StorageDistributed::read( modified_query_info.query = queryNodeToDistributedSelectQuery(query_tree_distributed); modified_query_info.query_tree = std::move(query_tree_distributed); + log_rewritten_query(base_target, modified_query_info.query); - if (!additional_table_functions.empty()) + if (!segments.empty()) { - for (const auto & table_function_entry : additional_table_functions) + for (const auto & segment : segments) { - // Create a modified query info with the additional predicate + // Create a modified query info with the segment predicate SelectQueryInfo additional_query_info = query_info; auto additional_query_tree = buildQueryTreeDistributed(additional_query_info, query_info.initial_storage_snapshot ? query_info.initial_storage_snapshot : storage_snapshot, - table_function_entry.storage_id ? *table_function_entry.storage_id : StorageID::createEmpty(), - table_function_entry.storage_id ? nullptr : table_function_entry.table_function_ast, - table_function_entry.predicate_ast); + segment.storage_id ? *segment.storage_id : StorageID::createEmpty(), + segment.storage_id ? nullptr : segment.table_function_ast, + segment.predicate_ast); additional_query_info.query = queryNodeToDistributedSelectQuery(additional_query_tree); additional_query_info.query_tree = std::move(additional_query_tree); + log_rewritten_query(describe_segment_target(segment), additional_query_info.query); additional_query_infos.push_back(std::move(additional_query_info)); } } - // For empty shards - avoid early return if we have additional table functions - if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_table_functions.empty()) + // For empty shards - avoid early return if we have additional segments + if (modified_query_info.getCluster()->getShardsInfo().empty() && segments.empty()) return; } else @@ -1132,36 +1170,38 @@ void StorageDistributed::read( modified_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, modified_query_info.query, remote_database, remote_table, remote_table_function_ptr, - additional_filter); + base_segment_predicate); + log_rewritten_query(base_target, modified_query_info.query); - if (!additional_table_functions.empty()) + if (!segments.empty()) { - for (const auto & table_function_entry : additional_table_functions) + for (const auto & segment : segments) { SelectQueryInfo additional_query_info = query_info; - if (table_function_entry.storage_id) + if (segment.storage_id) { additional_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, additional_query_info.query, - table_function_entry.storage_id->database_name, table_function_entry.storage_id->table_name, + segment.storage_id->database_name, segment.storage_id->table_name, nullptr, - table_function_entry.predicate_ast); + segment.predicate_ast); } else { additional_query_info.query = ClusterProxy::rewriteSelectQuery( local_context, additional_query_info.query, - "", "", table_function_entry.table_function_ast, - table_function_entry.predicate_ast); + "", "", segment.table_function_ast, + segment.predicate_ast); } + log_rewritten_query(describe_segment_target(segment), additional_query_info.query); additional_query_infos.push_back(std::move(additional_query_info)); } } - // For empty shards - avoid early return if we have additional table functions - if (modified_query_info.getCluster()->getShardsInfo().empty() && additional_table_functions.empty()) + // For empty shards - avoid early return if we have additional segments + if (modified_query_info.getCluster()->getShardsInfo().empty() && segments.empty()) { Pipe pipe(std::make_shared(header)); auto read_from_pipe = std::make_unique(std::move(pipe)); @@ -1184,7 +1224,7 @@ void StorageDistributed::read( processed_stage); auto shard_filter_generator = ClusterProxy::getShardFilterGeneratorForCustomKey( - *modified_query_info.getCluster(), local_context, getInMemoryMetadataPtr()->columns); + *modified_query_info.getCluster(), local_context, metadata_ptr->columns); ClusterProxy::executeQuery( query_plan, @@ -2190,17 +2230,36 @@ void StorageDistributed::delayInsertOrThrowIfNeeded() const } } -void StorageDistributed::setHybridLayout(std::vector additional_table_functions_) +void StorageDistributed::setHybridLayout(std::vector segments_) { - additional_table_functions = std::move(additional_table_functions_); + segments = std::move(segments_); log = getLogger("Hybrid (" + getStorageID().table_name + ")"); auto virtuals = createVirtuals(); - // or _layer_index? - virtuals.addEphemeral("_table_index", std::make_shared(), "Index of the table function in Hybrid (0 for main table, 1+ for additional table functions)"); + // or _segment_index? + virtuals.addEphemeral("_table_index", std::make_shared(), "Index of the table function in Hybrid (0 for main table, 1+ for additional segments)"); setVirtuals(virtuals); } +void StorageDistributed::setCachedColumnsToCast(ColumnsDescription columns) +{ + cached_columns_to_cast = std::move(columns); + if (!cached_columns_to_cast.empty() && log) + { + Names columns_with_types; + columns_with_types.reserve(cached_columns_to_cast.getAllPhysical().size()); + for (const auto & col : cached_columns_to_cast.getAllPhysical()) + columns_with_types.emplace_back(col.name + " " + col.type->getName()); + LOG_DEBUG(log, "Hybrid auto-cast will apply to: [{}]", fmt::join(columns_with_types, ", ")); + } +} + +ColumnsDescription StorageDistributed::getColumnsToCast() const +{ + return cached_columns_to_cast; +} + + void registerStorageDistributed(StorageFactory & factory) { factory.registerStorage("Distributed", [](const StorageFactory::Arguments & args) @@ -2307,8 +2366,6 @@ void registerStorageDistributed(StorageFactory & factory) void registerStorageHybrid(StorageFactory & factory) { - // Register Hybrid engine - // TODO: consider moving it to a separate file / subclass of StorageDistributed factory.registerStorage("Hybrid", [](const StorageFactory::Arguments & args) -> StoragePtr { ASTs & engine_args = args.engine_args; @@ -2322,6 +2379,13 @@ void registerStorageHybrid(StorageFactory & factory) if (!local_context) local_context = global_context; + if (args.mode <= LoadingStrictnessLevel::CREATE + && !local_context->getSettingsRef()[Setting::allow_experimental_hybrid_table]) + { + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, + "Experimental Hybrid table engine is not enabled (the setting 'allow_experimental_hybrid_table')"); + } + // Validate first argument - must be a table function ASTPtr first_arg = engine_args[0]; if (const auto * func = first_arg->as()) @@ -2348,13 +2412,34 @@ void registerStorageHybrid(StorageFactory & factory) if (!table_function) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid table function in Hybrid engine"); - // For schema inference, we need to determine the columns first if they're not provided + // Capture the physical columns reported by the first segment (table function) + ColumnsDescription first_segment_columns = table_function->getActualTableStructure(local_context, true); + + // For schema inference, prefer user-provided columns, otherwise use the physical ones ColumnsDescription columns_to_use = args.columns; if (columns_to_use.empty()) + columns_to_use = first_segment_columns; + + NameSet columns_to_cast_names; + auto validate_segment_schema = [&](const ColumnsDescription & segment_columns, const String & segment_name) { - // Get the column structure from the table function - columns_to_use = table_function->getActualTableStructure(local_context, true); - } + for (const auto & column : columns_to_use.getAllPhysical()) + { + auto found = segment_columns.tryGetPhysical(column.name); + if (!found) + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Hybrid segment {} is missing column '{}' required by Hybrid schema", + segment_name, column.name); + } + + if (!found->type->equals(*column.type)) + columns_to_cast_names.emplace(column.name); + } + }; + + validate_segment_schema(first_segment_columns, engine_args[0]->formatForLogging()); // Execute the table function to get the underlying storage StoragePtr storage = table_function->execute( @@ -2400,10 +2485,10 @@ void registerStorageHybrid(StorageFactory & factory) ASTPtr second_arg = engine_args[1]; validate_predicate(second_arg, 1); - distributed_storage->setAdditionalFilter(second_arg); + distributed_storage->setBaseSegmentPredicate(second_arg); // Parse additional table function pairs (if any) - std::vector additional_table_functions; + std::vector segment_definitions; for (size_t i = 2; i < engine_args.size(); i += 2) { if (i + 1 >= engine_args.size()) @@ -2429,11 +2514,13 @@ void registerStorageHybrid(StorageFactory & factory) // TableFunctionFactory::get mutates the AST in-place inside TableFunctionRemote::parseArguments. ASTPtr normalized_table_function_ast = table_function_ast->clone(); auto additional_table_function = TableFunctionFactory::instance().get(normalized_table_function_ast, local_context); - (void)additional_table_function; + ColumnsDescription segment_columns = additional_table_function->getActualTableStructure(local_context, true); replaceCurrentDatabaseFunction(normalized_table_function_ast, local_context); - // It's a table function - store the AST for later execution - additional_table_functions.emplace_back(normalized_table_function_ast, predicate_ast); + validate_segment_schema(segment_columns, normalized_table_function_ast->formatForLogging()); + + // It's a table function - store the AST and cached schema for later execution + segment_definitions.emplace_back(normalized_table_function_ast, predicate_ast); } else if (const auto * ast_identifier = table_function_ast->as()) { @@ -2445,16 +2532,34 @@ void registerStorageHybrid(StorageFactory & factory) "Argument #{}: identifier '{}' cannot be converted to table identifier", i, ast_identifier->name()); } + StoragePtr validated_table; try { // Parse table identifier to get StorageID StorageID storage_id(table_identifier); - // Sanity check: ensure the table identifier is fully qualified (has database name) + // Fill database for unqualified identifiers using current database (or the target table database). if (storage_id.database_name.empty()) { - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Argument #{}: table identifier '{}' must be fully qualified (database.table)", i, ast_identifier->name()); + String default_database = local_context->getCurrentDatabase(); + if (default_database.empty()) + default_database = args.table_id.database_name; + + if (default_database.empty()) + { + throw Exception(ErrorCodes::UNKNOWN_DATABASE, + "Argument #{}: table identifier '{}' does not specify database and no default database is selected", + i, ast_identifier->name()); + } + + storage_id.database_name = default_database; + + // Update AST so the table definition stores a fully qualified name. + auto qualified_identifier = std::make_shared(storage_id.database_name, storage_id.table_name); + qualified_identifier->alias = ast_identifier->alias; + qualified_identifier->prefer_alias_to_column_name = ast_identifier->prefer_alias_to_column_name; + table_function_ast = qualified_identifier; + engine_args[i] = table_function_ast; } // Sanity check: verify the table exists @@ -2473,6 +2578,7 @@ void registerStorageHybrid(StorageFactory & factory) throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table '{}.{}' does not exist", storage_id.database_name, storage_id.table_name); } + validated_table = table; } catch (const Exception & e) { @@ -2480,7 +2586,14 @@ void registerStorageHybrid(StorageFactory & factory) "Argument #{}: table '{}' validation failed: {}", i, ast_identifier->name(), e.message()); } - additional_table_functions.emplace_back(table_function_ast, predicate_ast, storage_id); + ColumnsDescription segment_columns; + + if (validated_table) + segment_columns = validated_table->getInMemoryMetadataPtr()->getColumns(); + + validate_segment_schema(segment_columns, storage_id.getNameForLogs()); + + segment_definitions.emplace_back(table_function_ast, predicate_ast, storage_id); } catch (const Exception & e) { @@ -2501,8 +2614,18 @@ void registerStorageHybrid(StorageFactory & factory) // but we need to rename it to the correct database and table names distributed_storage->renameInMemory({args.table_id.database_name, args.table_id.table_name, args.table_id.uuid}); - // Store additional table functions for later use - distributed_storage->setHybridLayout(std::move(additional_table_functions)); + // Store segment definitions for later use + distributed_storage->setHybridLayout(std::move(segment_definitions)); + if (!columns_to_cast_names.empty()) + { + NamesAndTypesList cast_cols; + for (const auto & col : columns_to_use.getAllPhysical()) + { + if (columns_to_cast_names.contains(col.name)) + cast_cols.emplace_back(col.name, col.type); + } + distributed_storage->setCachedColumnsToCast(ColumnsDescription(cast_cols)); + } return distributed_storage; }, diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 273a1c233c81..9526fc647fbc 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -50,19 +50,21 @@ class StorageDistributed final : public IStorage, WithContext friend class StorageSystemDistributionQueue; public: - /// Structure to hold table function AST, predicate, and optional StorageID for table identifiers - struct TableFunctionEntry + /// Structure to hold table function AST, predicate, optional StorageID, and cached physical columns for the segment. + /// Cached columns let us detect schema mismatches and enable features like hybrid_table_auto_cast_columns without + /// re-fetching remote headers on every query. + struct HybridSegment { ASTPtr table_function_ast; ASTPtr predicate_ast; std::optional storage_id; // For table identifiers instead of table functions - TableFunctionEntry(ASTPtr table_function_ast_, ASTPtr predicate_ast_) + HybridSegment(ASTPtr table_function_ast_, ASTPtr predicate_ast_) : table_function_ast(std::move(table_function_ast_)) , predicate_ast(std::move(predicate_ast_)) {} - TableFunctionEntry(ASTPtr table_function_ast_, ASTPtr predicate_ast_, StorageID storage_id_) + HybridSegment(ASTPtr table_function_ast_, ASTPtr predicate_ast_, StorageID storage_id_) : table_function_ast(std::move(table_function_ast_)) , predicate_ast(std::move(predicate_ast_)) , storage_id(std::move(storage_id_)) @@ -91,7 +93,7 @@ class StorageDistributed final : public IStorage, WithContext std::string getName() const override { - return (additional_table_functions.empty() && !additional_filter) + return (segments.empty() && !base_segment_predicate) ? "Distributed" : "Hybrid"; } @@ -173,14 +175,16 @@ class StorageDistributed final : public IStorage, WithContext size_t getShardCount() const; - /// Set additional filter for Hybrid engine - void setAdditionalFilter(ASTPtr filter) { additional_filter = std::move(filter); } + /// Set optional predicate applied to the base segment + void setBaseSegmentPredicate(ASTPtr predicate) { base_segment_predicate = std::move(predicate); } - /// Set additional table functions for Hybrid engine - void setHybridLayout(std::vector additional_table_functions_); + /// Set segment definitions for Hybrid engine along with cached schema info + void setHybridLayout(std::vector segments_); + void setCachedColumnsToCast(ColumnsDescription columns); /// Getter methods for ClusterProxy::executeQuery StorageID getRemoteStorageID() const { return remote_storage; } + ColumnsDescription getColumnsToCast() const; ExpressionActionsPtr getShardingKeyExpression() const { return sharding_key_expr; } const DistributedSettings * getDistributedSettings() const { return distributed_settings.get(); } bool isRemoteFunction() const { return is_remote_function; } @@ -321,10 +325,19 @@ class StorageDistributed final : public IStorage, WithContext bool is_remote_function; /// Additional filter expression for Hybrid engine - ASTPtr additional_filter; - - /// Additional table functions for Hybrid engine - std::vector additional_table_functions; + ASTPtr base_segment_predicate; + + /// Additional segments for Hybrid engine + std::vector segments; + + /// Hybrid build the list of columns which need to be casted once during CREATE/ATTACH + /// those are columns which type differs from the expected at least on one segment. + /// is is used by HybridCastsPass and hybrid_table_auto_cast_columns feature + /// without cache that would require reading the headers of the segments before every query + /// which may trigger extra DESCRIBE call in case of remote queries. + /// Subsequent segment DDL changes are not auto-detected; + /// reattach/recreate the Hybrid table to refresh. + ColumnsDescription cached_columns_to_cast; }; } diff --git a/tests/queries/0_stateless/03642_tiered_distributed.reference b/tests/queries/0_stateless/03643_hybrid.reference similarity index 70% rename from tests/queries/0_stateless/03642_tiered_distributed.reference rename to tests/queries/0_stateless/03643_hybrid.reference index d4ac38a21257..1954c097b478 100644 --- a/tests/queries/0_stateless/03642_tiered_distributed.reference +++ b/tests/queries/0_stateless/03643_hybrid.reference @@ -1,3 +1,4 @@ +Hybrid creation requires allow_experimental_hybrid_table Check Hybrid engine is registered Hybrid Ensure no leftovers before validation checks @@ -27,7 +28,7 @@ CREATE TABLE default.test_tiered_distributed_numbers_range\n(\n `number` UInt 2 3 4 -Create Hybrid table with two remote layers as table +Create Hybrid table with two remote segments as table CREATE TABLE default.test_tiered_distributed_numbers_dual\n(\n `number` UInt64\n)\nENGINE = Hybrid(remote(\'localhost:9000\', \'system.numbers\'), number < 5, remote(\'localhost:9000\', system.numbers), (number >= 10) AND (number <= 15))\nCOMMENT \'Generates all natural numbers, starting from 0 (to 2^64 - 1, and then again) in sorted order.\' 0 1 @@ -63,23 +64,24 @@ Create Hybrid table combining remote function and local table 13 14 15 -Verify Hybrid skips layer with always false predicate on the first layer +Verify Hybrid skips segment with always false predicate on the first segment 10 11 12 13 14 15 -Verify Hybrid skips layer with always false predicate on the second layer +Verify Hybrid skips segment with always false predicate on the second segment 0 1 2 -Prepare local MergeTree table for multi-layer tests +Hybrid raises when a segment is missing a column used by the base schema +Prepare local MergeTree table for multi-segment tests Populate local table with sample data -Create Hybrid table with three layer pairs -Count rows across all layers +Create Hybrid table with three segment pairs +Count rows across all segments 6 -Count rows from layers with id > 4 +Count rows from segments with id > 4 1 Count rows where value > 200 3 @@ -127,7 +129,7 @@ Verify additional_table_filters works consistently (legacy analyser) 2 Bob 200.3 Verify additional_table_filters works consistently (new analyser) 2 Bob 200.3 -Clean up Hybrid table with three layer pairs +Clean up Hybrid table with three segment pairs Clean up local helper table Drop predicate filtering fixtures if they exist Create local tables representing before/after watermark partitions @@ -135,7 +137,7 @@ Create second local table with different value type Insert rows before watermark into both tables Insert rows after watermark into both tables Create Hybrid table with analyzer disabled during reads -Insert row via Hybrid table (should go to first layer) +Insert row via Hybrid table (should go to first segment) Verify that inserted row landed in first table 17 John 2025-09-25 400 Verify that second table did not receive the inserted row @@ -172,14 +174,14 @@ Read predicate-filtered data with analyzer enabled and prefer localhost replica 21 Alice 2025-08-15 100 22 Bob 2025-08-20 200 23 Charlie 2025-08-25 300 -Check if the subqueries were recorded in query_log +Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 0) Row 1: ────── type: QueryFinish is_initial_query2: 1 tbl: ['_table_function.remote','db.test_tiered_watermark'] -qry: SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark', max_threads=1 FORMAT Null; -log_comment: test_tiered_watermark +qry: SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 0, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark1', max_threads=1 FORMAT Null; +log_comment: test_tiered_watermark1 Row 2: ────── @@ -187,7 +189,7 @@ type: QueryFinish is_initial_query2: 0 tbl: ['db.test_tiered_watermark_after'] qry: SELECT `__table1`.`id` AS `id`, `__table1`.`name` AS `name`, `__table1`.`date` AS `date`, `__table1`.`value` AS `value` FROM `db`.`test_tiered_watermark_after` AS `__table1` WHERE `__table1`.`date` >= '2025-09-01' ORDER BY `__table1`.`id` DESC -log_comment: test_tiered_watermark +log_comment: test_tiered_watermark1 Row 3: ────── @@ -195,5 +197,29 @@ type: QueryFinish is_initial_query2: 0 tbl: ['db.test_tiered_watermark_before'] qry: SELECT `__table1`.`id` AS `id`, `__table1`.`name` AS `name`, `__table1`.`date` AS `date`, `__table1`.`value` AS `value` FROM `db`.`test_tiered_watermark_before` AS `__table1` WHERE `__table1`.`date` < '2025-09-01' ORDER BY `__table1`.`id` DESC -log_comment: test_tiered_watermark +log_comment: test_tiered_watermark1 +Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 1) +Row 1: +────── +type: QueryFinish +is_initial_query2: 1 +tbl: ['_table_function.remote','db.test_tiered_watermark'] +qry: SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 1, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark2', max_threads=1 FORMAT Null; +log_comment: test_tiered_watermark2 + +Row 2: +────── +type: QueryFinish +is_initial_query2: 0 +tbl: ['db.test_tiered_watermark_after'] +qry: SELECT _CAST(`__table1`.`id`, 'UInt32') AS `id`, _CAST(`__table1`.`name`, 'String') AS `name`, `__table1`.`date` AS `date`, _CAST(`__table1`.`value`, 'UInt32') AS `value` FROM `db`.`test_tiered_watermark_after` AS `__table1` WHERE `__table1`.`date` >= '2025-09-01' ORDER BY `id` DESC +log_comment: test_tiered_watermark2 + +Row 3: +────── +type: QueryFinish +is_initial_query2: 0 +tbl: ['db.test_tiered_watermark_before'] +qry: SELECT _CAST(`__table1`.`id`, 'UInt32') AS `id`, _CAST(`__table1`.`name`, 'String') AS `name`, `__table1`.`date` AS `date`, _CAST(`__table1`.`value`, 'UInt32') AS `value` FROM `db`.`test_tiered_watermark_before` AS `__table1` WHERE `__table1`.`date` < '2025-09-01' ORDER BY _CAST(`__table1`.`id`, 'UInt32') DESC +log_comment: test_tiered_watermark2 Clean up predicate filtering tables diff --git a/tests/queries/0_stateless/03642_tiered_distributed.sql b/tests/queries/0_stateless/03643_hybrid.sql similarity index 71% rename from tests/queries/0_stateless/03642_tiered_distributed.sql rename to tests/queries/0_stateless/03643_hybrid.sql index 4fd346bba166..851045aafcfa 100644 --- a/tests/queries/0_stateless/03642_tiered_distributed.sql +++ b/tests/queries/0_stateless/03643_hybrid.sql @@ -1,4 +1,10 @@ --- Test Hybrid engine registration and basic validation +SELECT 'Hybrid creation requires allow_experimental_hybrid_table'; +SET allow_experimental_hybrid_table = 0; +CREATE TABLE test_hybrid_requires_setting (`dummy` UInt8) ENGINE = Hybrid(remote('localhost:9000'), 1); -- { serverError SUPPORT_IS_DISABLED } +DROP TABLE IF EXISTS test_hybrid_requires_setting SYNC; + +SET allow_experimental_hybrid_table = 1; + SELECT 'Check Hybrid engine is registered'; SELECT name FROM system.table_engines WHERE name = 'Hybrid'; @@ -49,7 +55,7 @@ SHOW CREATE TABLE test_tiered_distributed_numbers_range; SELECT * FROM test_tiered_distributed_numbers_range ORDER BY number; DROP TABLE IF EXISTS test_tiered_distributed_numbers_range SYNC; -SELECT 'Create Hybrid table with two remote layers as table'; +SELECT 'Create Hybrid table with two remote segments as table'; DROP TABLE IF EXISTS test_tiered_distributed_numbers_dual SYNC; CREATE TABLE test_tiered_distributed_numbers_dual ENGINE = Hybrid( remote('localhost:9000', system.numbers), number < 5, @@ -73,7 +79,7 @@ CREATE TABLE test_tiered_distributed_numbers_mixed SELECT * FROM test_tiered_distributed_numbers_mixed ORDER BY number; DROP TABLE IF EXISTS test_tiered_distributed_numbers_mixed SYNC; -SELECT 'Verify Hybrid skips layer with always false predicate on the first layer'; +SELECT 'Verify Hybrid skips segment with always false predicate on the first segment'; DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_first SYNC; CREATE TABLE test_tiered_distributed_numbers_skip_first ( @@ -85,7 +91,7 @@ CREATE TABLE test_tiered_distributed_numbers_skip_first SELECT * FROM test_tiered_distributed_numbers_skip_first ORDER BY number; DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_first SYNC; -SELECT 'Verify Hybrid skips layer with always false predicate on the second layer'; +SELECT 'Verify Hybrid skips segment with always false predicate on the second segment'; DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_second SYNC; CREATE TABLE test_tiered_distributed_numbers_skip_second ( @@ -97,9 +103,41 @@ CREATE TABLE test_tiered_distributed_numbers_skip_second SELECT * FROM test_tiered_distributed_numbers_skip_second ORDER BY number; DROP TABLE IF EXISTS test_tiered_distributed_numbers_skip_second SYNC; +SELECT 'Hybrid raises when a segment is missing a column used by the base schema'; +DROP TABLE IF EXISTS test_hybrid_segment_full SYNC; +DROP TABLE IF EXISTS test_hybrid_segment_partial SYNC; +DROP TABLE IF EXISTS test_hybrid_missing_column SYNC; + +CREATE TABLE test_hybrid_segment_full +( + `id` UInt32, + `value` UInt32 +) +ENGINE = MergeTree() +ORDER BY id; + +CREATE TABLE test_hybrid_segment_partial +( + `id` UInt32 +) +ENGINE = MergeTree() +ORDER BY id; + +INSERT INTO test_hybrid_segment_full VALUES (1, 10), (2, 20); +INSERT INTO test_hybrid_segment_partial VALUES (3), (4); + +CREATE TABLE test_hybrid_missing_column ENGINE = Hybrid( + remote('localhost:9000', currentDatabase(), 'test_hybrid_segment_full'), id < 3, + remote('localhost:9000', currentDatabase(), 'test_hybrid_segment_partial'), id >= 3 +); -- { serverError BAD_ARGUMENTS } + +DROP TABLE IF EXISTS test_hybrid_missing_column SYNC; +DROP TABLE IF EXISTS test_hybrid_segment_partial SYNC; +DROP TABLE IF EXISTS test_hybrid_segment_full SYNC; + ----------------------------- -SELECT 'Prepare local MergeTree table for multi-layer tests'; +SELECT 'Prepare local MergeTree table for multi-segment tests'; DROP TABLE IF EXISTS test_tiered_local_data SYNC; CREATE TABLE test_tiered_local_data ( @@ -119,10 +157,10 @@ INSERT INTO test_tiered_local_data VALUES (4, 'David', '2022-01-04 13:00:00', 300.2), (5, 'Eve', '2022-01-05 14:00:00', 250.1); -SELECT 'Create Hybrid table with three layer pairs'; -DROP TABLE IF EXISTS test_tiered_multi_layer SYNC; +SELECT 'Create Hybrid table with three segment pairs'; +DROP TABLE IF EXISTS test_tiered_multi_segment SYNC; -CREATE TABLE test_tiered_multi_layer +CREATE TABLE test_tiered_multi_segment ( `id` UInt32, `name` String, @@ -138,52 +176,52 @@ ENGINE = Hybrid( id > 3 ); -SELECT 'Count rows across all layers'; -SELECT count() FROM test_tiered_multi_layer; -SELECT 'Count rows from layers with id > 4'; -SELECT count() FROM test_tiered_multi_layer WHERE id > 4; +SELECT 'Count rows across all segments'; +SELECT count() FROM test_tiered_multi_segment; +SELECT 'Count rows from segments with id > 4'; +SELECT count() FROM test_tiered_multi_segment WHERE id > 4; SELECT 'Count rows where value > 200'; -SELECT count() FROM test_tiered_multi_layer WHERE value > 200; +SELECT count() FROM test_tiered_multi_segment WHERE value > 200; SELECT 'Count rows named Alice'; -SELECT count() AS alice_rows FROM test_tiered_multi_layer WHERE name = 'Alice'; +SELECT count() AS alice_rows FROM test_tiered_multi_segment WHERE name = 'Alice'; SELECT 'Select rows ordered by value descending (id > 2)'; -SELECT id, name, value FROM test_tiered_multi_layer WHERE id > 2 ORDER BY value DESC; +SELECT id, name, value FROM test_tiered_multi_segment WHERE id > 2 ORDER BY value DESC; SELECT 'Limit results ordered by id'; -SELECT * FROM test_tiered_multi_layer ORDER BY id LIMIT 3; +SELECT * FROM test_tiered_multi_segment ORDER BY id LIMIT 3; SELECT 'Explain plan for filter on value'; -EXPLAIN SELECT * FROM test_tiered_multi_layer WHERE value > 150 SETTINGS prefer_localhost_replica=0, enable_analyzer=0; -EXPLAIN SELECT * FROM test_tiered_multi_layer WHERE value > 150 SETTINGS prefer_localhost_replica=0, enable_analyzer=1; -EXPLAIN SELECT * FROM test_tiered_multi_layer WHERE value > 150 SETTINGS prefer_localhost_replica=1, enable_analyzer=0; -EXPLAIN SELECT * FROM test_tiered_multi_layer WHERE value > 150 SETTINGS prefer_localhost_replica=1, enable_analyzer=1; +EXPLAIN SELECT * FROM test_tiered_multi_segment WHERE value > 150 SETTINGS prefer_localhost_replica=0, enable_analyzer=0; +EXPLAIN SELECT * FROM test_tiered_multi_segment WHERE value > 150 SETTINGS prefer_localhost_replica=0, enable_analyzer=1; +EXPLAIN SELECT * FROM test_tiered_multi_segment WHERE value > 150 SETTINGS prefer_localhost_replica=1, enable_analyzer=0; +EXPLAIN SELECT * FROM test_tiered_multi_segment WHERE value > 150 SETTINGS prefer_localhost_replica=1, enable_analyzer=1; SELECT 'Aggregate values across name when filtering by event_time'; SELECT name, count() AS count, avg(value) AS avg_value -FROM test_tiered_multi_layer +FROM test_tiered_multi_segment WHERE event_time >= '2022-01-02' GROUP BY name ORDER BY avg_value DESC; SELECT 'Verify additional_table_filters works consistently (legacy analyser)'; SELECT id, name, value -FROM test_tiered_multi_layer +FROM test_tiered_multi_segment WHERE id < 3 ORDER BY id -SETTINGS additional_table_filters = {'test_tiered_multi_layer' : 'id > 1'}, allow_experimental_analyzer = 0; +SETTINGS additional_table_filters = {'test_tiered_multi_segment' : 'id > 1'}, allow_experimental_analyzer = 0; SELECT 'Verify additional_table_filters works consistently (new analyser)'; SELECT id, name, value -FROM test_tiered_multi_layer +FROM test_tiered_multi_segment WHERE id < 3 ORDER BY id -SETTINGS additional_table_filters = {'test_tiered_multi_layer' : 'id > 1'}, allow_experimental_analyzer = 1; +SETTINGS additional_table_filters = {'test_tiered_multi_segment' : 'id > 1'}, allow_experimental_analyzer = 1; -SELECT 'Clean up Hybrid table with three layer pairs'; -DROP TABLE IF EXISTS test_tiered_multi_layer SYNC; +SELECT 'Clean up Hybrid table with three segment pairs'; +DROP TABLE IF EXISTS test_tiered_multi_segment SYNC; SELECT 'Clean up local helper table'; DROP TABLE IF EXISTS test_tiered_local_data SYNC; @@ -254,7 +292,7 @@ ENGINE = Hybrid( date < '2025-09-01' ); -SELECT 'Insert row via Hybrid table (should go to first layer)'; +SELECT 'Insert row via Hybrid table (should go to first segment)'; INSERT INTO test_tiered_watermark SETTINGS distributed_foreground_insert = 1 VALUES (17, 'John', '2025-09-25', 400); @@ -275,9 +313,33 @@ SELECT * FROM test_tiered_watermark ORDER BY id SETTINGS enable_analyzer = 1, pr -- other combinations of settings work, but give a bit different content in the query_log -- See the problem around is_initial_query described in https://github.com/Altinity/ClickHouse/issues/1077 -SELECT 'Check if the subqueries were recorded in query_log'; +SELECT 'Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 0)'; + +SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 0, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark1', max_threads=1 FORMAT Null; +SYSTEM FLUSH LOGS; +SELECT + type, + query_id = initial_query_id AS is_initial_query2, + arraySort(arrayMap(x -> replaceAll(x, currentDatabase(), 'db'), tables)) as tbl, + replaceAll(query, currentDatabase(), 'db') as qry, + log_comment +FROM system.query_log +WHERE + event_time > now() - 300 AND type = 'QueryFinish' AND + initial_query_id IN ( + SELECT initial_query_id + FROM system.query_log + WHERE + event_time > now() - 300 + and log_comment = 'test_tiered_watermark1' + and current_database = currentDatabase() + and query_id = initial_query_id ) +ORDER BY tbl, event_time_microseconds +FORMAT Vertical; -SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark', max_threads=1 FORMAT Null; +SELECT 'Check if the subqueries were recorded in query_log (hybrid_table_auto_cast_columns = 1)'; + +SELECT * FROM test_tiered_watermark ORDER BY id DESC SETTINGS enable_analyzer = 1, hybrid_table_auto_cast_columns = 1, prefer_localhost_replica = 0, log_queries=1, serialize_query_plan=0, log_comment = 'test_tiered_watermark2', max_threads=1 FORMAT Null; SYSTEM FLUSH LOGS; SELECT type, @@ -293,7 +355,7 @@ WHERE FROM system.query_log WHERE event_time > now() - 300 - and log_comment = 'test_tiered_watermark' + and log_comment = 'test_tiered_watermark2' and current_database = currentDatabase() and query_id = initial_query_id ) ORDER BY tbl, event_time_microseconds @@ -305,7 +367,7 @@ DROP TABLE IF EXISTS test_tiered_watermark SYNC; DROP TABLE IF EXISTS test_tiered_watermark_after SYNC; DROP TABLE IF EXISTS test_tiered_watermark_before SYNC; --- TODO: +-- TODO: - addressed by 03644_hybrid_auto_cast.sql -- Code: 70. DB::Exception: Received from localhost:9000. DB::Exception: Conversion from AggregateFunction(sum, Decimal(38, 0)) to AggregateFunction(sum, UInt32) is not supported: while converting source column `sum(__table1.value)` to destination column `sum(__table1.value)`. (CANNOT_CONVERT_TYPE) -- SELECT sum(value) FROM test_tiered_watermark; @@ -326,13 +388,13 @@ DROP TABLE IF EXISTS test_tiered_watermark_before SYNC; -- 1. Integration tests (similar to tests/queries/0_stateless) -- - Base SELECT with date split: part in Distributed, part in S3 -> results should match a manual UNION ALL (with correct ORDER BY/aggregation). -- - GROUP BY / ORDER BY / LIMIT: confirm the stage is selected correctly, finalization happens at the top, rows_before_limit_at_least is correct (createLocalPlan already keeps LIMIT). --- - JOIN: with a small table on the initiator; check GLOBAL JOIN scenarios. Ensure remote layers behave the same as remote shard subqueries created through createLocalPlan. --- - skipUnusedShards: with analyzer ensure layer conditions are respected (where FILTER DAG is available). --- - Constants: hostName()/now() in SELECT across several layers -> ensure no discrepancies. --- - EXPLAIN PLAN/PIPELINE: show child plans for layers and remote plans. +-- - JOIN: with a small table on the initiator; check GLOBAL JOIN scenarios. Ensure remote segments behave the same as remote shard subqueries created through createLocalPlan. +-- - skipUnusedShards: with analyzer ensure segment conditions are respected (where FILTER DAG is available). +-- - Constants: hostName()/now() in SELECT across several segments -> ensure no discrepancies. +-- - EXPLAIN PLAN/PIPELINE: show child plans for segments and remote plans. -- - Subqueries in logs. -- - Different column sets/types: supertype in snapshot, converting actions on read. --- - Object columns: same as Distributed — use ColumnsDescriptionByShardNum for layers if needed (optional for local layers; already implemented for Distributed). +-- - Object columns: same as Distributed — use ColumnsDescriptionByShardNum for segments if needed (optional for local segments; already implemented for Distributed). -- Condition with dictGet('a1_watermarks_dict', ...) @@ -342,4 +404,4 @@ DROP TABLE IF EXISTS test_tiered_watermark_before SYNC; -- TODO: -- test for distributed_aggregation_memory_efficient & enable_memory_bound_merging_of_aggregation_results -- to avoid UNKNOWN_AGGREGATED_DATA_VARIANT when mixing different aggregation variants --- from remote shards (with memory_bound) and local layers (without memory_bound) +-- from remote shards (with memory_bound) and local segments (without memory_bound) diff --git a/tests/queries/0_stateless/03644_hybrid_auto_cast.reference b/tests/queries/0_stateless/03644_hybrid_auto_cast.reference new file mode 100644 index 000000000000..869ac32216b6 --- /dev/null +++ b/tests/queries/0_stateless/03644_hybrid_auto_cast.reference @@ -0,0 +1,13 @@ +hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 (headers mismatch) +hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 (headers mismatch) +1 +hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 manual cast +600 +1 +hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 manual cast +600 +1 +hybrid_table_auto_cast_columns = 1, enable_analyzer = 1 +600 +1 +hybrid_table_auto_cast_columns = 1, enable_analyzer = 0 (analizer required) diff --git a/tests/queries/0_stateless/03644_hybrid_auto_cast.sql b/tests/queries/0_stateless/03644_hybrid_auto_cast.sql new file mode 100644 index 000000000000..7248dd9a55ef --- /dev/null +++ b/tests/queries/0_stateless/03644_hybrid_auto_cast.sql @@ -0,0 +1,85 @@ +SET allow_experimental_hybrid_table = 1, + prefer_localhost_replica = 0; + +DROP TABLE IF EXISTS test_tiered_watermark_after; +DROP TABLE IF EXISTS test_tiered_watermark_before; +DROP TABLE IF EXISTS test_tiered_watermark; + +CREATE TABLE test_tiered_watermark_after +( + `id` UInt32, + `name` String, + `date` Date, + `value` UInt64, + `categories` Array(UInt32) +) +ENGINE = MergeTree() +ORDER BY id; + +CREATE TABLE test_tiered_watermark_before +( + `id` Int32, + `name` Nullable(String), + `date` Date, + `value` Decimal128(0), + `categories` Array(Int64) +) +ENGINE = MergeTree() +ORDER BY id; + +INSERT INTO test_tiered_watermark_after VALUES + (11, 'Alice', '2025-08-15', 100, [100, 10]), + (12, 'Bob', '2025-08-20', 200, [200, 20]), + (13, 'Charlie', '2025-08-25', 300, [300, 30]), + (14, 'David', '2025-09-05', 400, [400, 40]), + (15, 'Eve', '2025-09-10', 500, [500, 50]), + (16, 'Frank', '2025-09-15', 600, [600, 60]); + +INSERT INTO test_tiered_watermark_before VALUES + (21, 'Alice', '2025-08-15', 100, [100, 10]), + (22, 'Bob', '2025-08-20', 200, [200, 20]), + (23, 'Charlie', '2025-08-25', 300, [300, 30]), + (24, 'David', '2025-09-05', 400, [400, 40]), + (25, 'Eve', '2025-09-10', 500, [500, 50]), + (26, 'Frank', '2025-09-15', 600, [600, 60]); + +CREATE TABLE test_tiered_watermark +ENGINE = Hybrid( + remote('127.0.0.1:9000', currentDatabase(), 'test_tiered_watermark_after'), + date >= '2025-09-01', + remote('127.0.0.1:9000', currentDatabase(), 'test_tiered_watermark_before'), + date < '2025-09-01' +); + +-- the problem +SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 (headers mismatch)'; +SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 1; +SELECT max(value) FROM test_tiered_watermark; -- { serverError CANNOT_CONVERT_TYPE } +SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_tiered_watermark; -- { serverError THERE_IS_NO_COLUMN } + +SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 (headers mismatch)'; +SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 0; +SELECT max(value) FROM test_tiered_watermark; -- { serverError CANNOT_CONVERT_TYPE } +SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_tiered_watermark; -- works w/o analyzer + +-- workaround - explicit cast +SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 1 manual cast'; +SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 1; +SELECT max(value::UInt32) FROM test_tiered_watermark; +SELECT sum(if(arrayExists(x -> (x IN (10)), categories::Array(UInt32)), 1, 0)) AS x FROM test_tiered_watermark; + +SELECT 'hybrid_table_auto_cast_columns = 0, enable_analyzer = 0 manual cast'; +SET hybrid_table_auto_cast_columns = 0, enable_analyzer = 0; +SELECT max(value::UInt32) FROM test_tiered_watermark; +SELECT sum(if(arrayExists(x -> (x IN (10)), categories::Array(UInt32)), 1, 0)) AS x FROM test_tiered_watermark; + +-- feature to add casts automatically +SELECT 'hybrid_table_auto_cast_columns = 1, enable_analyzer = 1'; +SET hybrid_table_auto_cast_columns = 1, enable_analyzer = 1; +SELECT max(value) FROM test_tiered_watermark; +SELECT sum(if(arrayExists(x -> (x IN (10)), categories), 1, 0)) AS x FROM test_tiered_watermark; + +SELECT 'hybrid_table_auto_cast_columns = 1, enable_analyzer = 0 (analizer required)'; +SET hybrid_table_auto_cast_columns = 1, enable_analyzer = 0; +SELECT max(value) FROM test_tiered_watermark; -- { serverError CANNOT_CONVERT_TYPE } + diff --git a/tests/queries/0_stateless/03644_hybrid_unqualified_table.reference b/tests/queries/0_stateless/03644_hybrid_unqualified_table.reference new file mode 100644 index 000000000000..98bae7afae05 --- /dev/null +++ b/tests/queries/0_stateless/03644_hybrid_unqualified_table.reference @@ -0,0 +1,3 @@ +Hybrid allows unqualified local tables by default +3 +1 diff --git a/tests/queries/0_stateless/03644_hybrid_unqualified_table.sql b/tests/queries/0_stateless/03644_hybrid_unqualified_table.sql new file mode 100644 index 000000000000..672beea1afc3 --- /dev/null +++ b/tests/queries/0_stateless/03644_hybrid_unqualified_table.sql @@ -0,0 +1,33 @@ +SET allow_experimental_hybrid_table = 1; + +SELECT 'Hybrid allows unqualified local tables by default'; + +DROP TABLE IF EXISTS test_hybrid_unqualified_segment SYNC; +DROP TABLE IF EXISTS test_hybrid_unqualified SYNC; + +CREATE TABLE test_hybrid_unqualified_segment +( + `number` UInt64 +) +ENGINE = MergeTree() +ORDER BY tuple(); + +INSERT INTO test_hybrid_unqualified_segment VALUES (10), (20); + +CREATE TABLE test_hybrid_unqualified +( + `number` UInt64 +) +ENGINE = Hybrid( + remote('localhost:9000', system.numbers), number = 0, + test_hybrid_unqualified_segment, number >= 10 +); + +SELECT count() FROM test_hybrid_unqualified; + +SELECT positionCaseInsensitive(engine_full, concat(currentDatabase(), '.test_hybrid_unqualified_segment')) > 0 +FROM system.tables +WHERE database = currentDatabase() AND name = 'test_hybrid_unqualified'; + +DROP TABLE IF EXISTS test_hybrid_unqualified SYNC; +DROP TABLE IF EXISTS test_hybrid_unqualified_segment SYNC;