From 5af7474ba6ffc3f0dc294800dec755a94f904927 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 17 Sep 2025 11:48:16 +0200 Subject: [PATCH 1/9] Read optimization based on Iceberg metadata --- src/Core/Range.cpp | 55 +++++++ src/Core/Range.h | 5 + src/Disks/ObjectStorages/IObjectStorage.cpp | 36 ++++- src/Disks/ObjectStorages/IObjectStorage.h | 35 +++-- src/Processors/Chunk.cpp | 7 +- .../DataLakes/DataLakeConfiguration.h | 2 +- .../DataLakes/DeltaLakeMetadata.cpp | 4 +- .../DataLakes/DeltaLakeMetadata.h | 6 +- .../DeltaLakeMetadataDeltaKernel.cpp | 2 +- .../DataLakes/DeltaLakeMetadataDeltaKernel.h | 2 +- .../ObjectStorage/DataLakes/HudiMetadata.cpp | 8 +- .../ObjectStorage/DataLakes/HudiMetadata.h | 6 +- .../DataLakes/IDataLakeMetadata.cpp | 99 +++++++++++- .../DataLakes/IDataLakeMetadata.h | 62 +++++++- .../DataLakes/Iceberg/IcebergMetadata.cpp | 9 +- .../DataLakes/Iceberg/IcebergMetadata.h | 6 +- .../StorageObjectStorageSource.cpp | 142 ++++++++++++++++-- .../StorageObjectStorageSource.h | 27 +++- ...rageObjectStorageStableTaskDistributor.cpp | 13 +- src/Storages/prepareReadingFromFormat.cpp | 1 + src/Storages/prepareReadingFromFormat.h | 2 + 21 files changed, 465 insertions(+), 64 deletions(-) diff --git a/src/Core/Range.cpp b/src/Core/Range.cpp index 139fb8db76c9..d15b7df88aaa 100644 --- a/src/Core/Range.cpp +++ b/src/Core/Range.cpp @@ -9,6 +9,12 @@ namespace DB { +namespace ErrorCodes +{ +extern const int INCORRECT_DATA; +}; + + FieldRef::FieldRef(ColumnsWithTypeAndName * columns_, size_t row_idx_, size_t column_idx_) : Field((*(*columns_)[column_idx_].column)[row_idx_]), columns(columns_), row_idx(row_idx_), column_idx(column_idx_) { @@ -151,6 +157,13 @@ bool Range::isInfinite() const return left.isNegativeInfinity() && right.isPositiveInfinity(); } +/// [x, x] +bool Range::isPoint() const +{ + return fullBounded() && left_included && right_included && equals(left, right) + && !left.isNegativeInfinity() && !left.isPositiveInfinity(); +} + bool Range::intersectsRange(const Range & r) const { /// r to the left of me. @@ -332,6 +345,48 @@ String Range::toString() const return str.str(); } +String Range::dump() const +{ + WriteBufferFromOwnString str; + + str << (left_included ? '[' : '(') << left.dump() << ","; + str << right.dump() << (right_included ? ']' : ')'); + + return str.str(); +} + +void Range::restoreFromDump(const String & range) +{ + if (range.empty()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Empty range dump"); + + if (range[0] == '[') + left_included = true; + else if (range[0] == '(') + left_included = false; + else + throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect range: {}", range); + + if (range[range.size() - 1] == ']') + right_included = true; + else if (range[range.size() - 1] == ')') + right_included = false; + else + throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect range: {}", range); + + /// TODO: Strings with comma + auto separator = range.find(','); + if (separator == std::string::npos || separator == range.size()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect range: {}", range); + + std::string_view l(range.data() + 1, separator - 1); + std::string_view r(range.data() + separator + 1, range.size() - separator - 2); + + /// TODO: "Decimal64_'1596962100.000000'" can't be parsed by some reason + left = Field::restoreFromDump(std::string_view(range.data() + 1, separator - 1)); + right = Field::restoreFromDump(std::string_view(range.data() + separator + 1, range.size() - separator - 2)); +} + Hyperrectangle intersect(const Hyperrectangle & a, const Hyperrectangle & b) { size_t result_size = std::min(a.size(), b.size()); diff --git a/src/Core/Range.h b/src/Core/Range.h index 6072795db0a9..b96253b20e85 100644 --- a/src/Core/Range.h +++ b/src/Core/Range.h @@ -94,6 +94,8 @@ struct Range bool isBlank() const; + bool isPoint() const; + bool intersectsRange(const Range & r) const; bool containsRange(const Range & r) const; @@ -114,6 +116,9 @@ struct Range bool nearByWith(const Range & r) const; String toString() const; + + String dump() const; + void restoreFromDump(const String & range); }; Range intersect(const Range & a, const Range & b); diff --git a/src/Disks/ObjectStorages/IObjectStorage.cpp b/src/Disks/ObjectStorages/IObjectStorage.cpp index 2e674d41e9a3..d126bae13da4 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.cpp +++ b/src/Disks/ObjectStorages/IObjectStorage.cpp @@ -8,6 +8,9 @@ #include #include +/// TODO: move DataFileInfo into separate file +#include + #include #include #include @@ -101,6 +104,28 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings return write_settings; } +RelativePathWithMetadata::RelativePathWithMetadata(const String & task_string, std::optional metadata_) + : metadata(std::move(metadata_)) + , command(task_string) +{ + if (!command.isParsed()) + relative_path = task_string; + else + { + auto file_path = command.getFilePath(); + if (file_path.has_value()) + relative_path = file_path.value(); + file_meta_info = command.getFileMetaInfo(); + } +} + +RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional metadata_) + : metadata(std::move(metadata_)) +{ + relative_path = info.file_path; + file_meta_info = info.file_meta_info; +} + void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file) { if (!metadata) @@ -129,8 +154,12 @@ RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std successfully_parsed = true; + if (json->has("file_path")) + file_path = json->getValue("file_path"); if (json->has("retry_after_us")) retry_after_us = json->getValue("retry_after_us"); + if (json->has("meta_info")) + file_meta_info = std::make_shared(json->getObject("meta_info")); } catch (const Poco::JSON::JSONException &) { /// Not a JSON @@ -138,11 +167,16 @@ RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std } } -std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const +std::string RelativePathWithMetadata::CommandInTaskResponse::toString() const { Poco::JSON::Object json; + + if (file_path.has_value()) + json.set("file_path", file_path.value()); if (retry_after_us.has_value()) json.set("retry_after_us", retry_after_us.value()); + if (file_meta_info.has_value()) + json.set("meta_info", file_meta_info.value()->toJson()); std::ostringstream oss; oss.exceptions(std::ios::failbit); diff --git a/src/Disks/ObjectStorages/IObjectStorage.h b/src/Disks/ObjectStorages/IObjectStorage.h index e4b1d48771d4..8bc553138377 100644 --- a/src/Disks/ObjectStorages/IObjectStorage.h +++ b/src/Disks/ObjectStorages/IObjectStorage.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -101,6 +102,10 @@ struct ObjectMetadata ObjectAttributes attributes; }; +struct DataFileInfo; +class DataFileMetaInfo; +using DataFileMetaInfoPtr = std::shared_ptr; + struct RelativePathWithMetadata { class CommandInTaskResponse @@ -109,31 +114,35 @@ struct RelativePathWithMetadata CommandInTaskResponse() = default; explicit CommandInTaskResponse(const std::string & task); - bool is_parsed() const { return successfully_parsed; } - void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; } + bool isParsed() const { return successfully_parsed; } + void setFilePath(const std::string & file_path_ ) { file_path = file_path_; } + void setRetryAfterUs(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; } + void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ ) { file_meta_info = file_meta_info_; } + + std::string toString() const; + + std::optional getFilePath() const { return file_path; } - std::string to_string() const; + std::optional getRetryAfterUs() const { return retry_after_us; } - std::optional get_retry_after_us() const { return retry_after_us; } + std::optional getFileMetaInfo() const { return file_meta_info; } private: bool successfully_parsed = false; + std::optional file_path; std::optional retry_after_us; + std::optional file_meta_info; }; String relative_path; std::optional metadata; CommandInTaskResponse command; + std::optional file_meta_info; RelativePathWithMetadata() = default; - explicit RelativePathWithMetadata(const String & task_string, std::optional metadata_ = std::nullopt) - : metadata(std::move(metadata_)) - , command(task_string) - { - if (!command.is_parsed()) - relative_path = task_string; - } + explicit RelativePathWithMetadata(const String & task_string, std::optional metadata_ = std::nullopt); + explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional metadata_ = std::nullopt); virtual ~RelativePathWithMetadata() = default; @@ -143,6 +152,10 @@ struct RelativePathWithMetadata virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); } + void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ ) { file_meta_info = file_meta_info_; } + void setFileMetaInfo(std::optional file_meta_info_ ) { file_meta_info = file_meta_info_; } + std::optional getFileMetaInfo() const { return file_meta_info; } + void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file); const CommandInTaskResponse & getCommand() const { return command; } }; diff --git a/src/Processors/Chunk.cpp b/src/Processors/Chunk.cpp index e27762f53dc4..34402cd58249 100644 --- a/src/Processors/Chunk.cpp +++ b/src/Processors/Chunk.cpp @@ -108,7 +108,12 @@ void Chunk::addColumn(ColumnPtr column) void Chunk::addColumn(size_t position, ColumnPtr column) { - if (position >= columns.size()) + if (position == columns.size()) + { + addColumn(column); + return; + } + if (position > columns.size()) throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND, "Position {} out of bound in Chunk::addColumn(), max position = {}", position, !columns.empty() ? columns.size() - 1 : 0); diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 4768936e3627..d70a05e6a325 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -104,7 +104,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl return std::nullopt; auto data_files = current_metadata->getDataFiles(); if (!data_files.empty()) - return data_files[0]; + return data_files[0].file_path; return std::nullopt; } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp index e640e467c6d9..8a6ce493e203 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp @@ -158,7 +158,7 @@ struct DeltaLakeMetadataImpl struct DeltaLakeMetadata { NamesAndTypesList schema; - Strings data_files; + DataFileInfos data_files; DeltaLakePartitionColumns partition_columns; }; @@ -195,7 +195,7 @@ struct DeltaLakeMetadataImpl processMetadataFile(key, current_schema, current_partition_columns, result_files); } - return DeltaLakeMetadata{current_schema, Strings(result_files.begin(), result_files.end()), current_partition_columns}; + return DeltaLakeMetadata{current_schema, DataFileInfos(result_files.begin(), result_files.end()), current_partition_columns}; } /** diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h index a3cf16cd6673..c46e91d9dcbe 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h @@ -35,7 +35,7 @@ class DeltaLakeMetadata final : public IDataLakeMetadata DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); - Strings getDataFiles() const override { return data_files; } + DataFileInfos getDataFiles() const override { return data_files; } NamesAndTypesList getTableSchema() const override { return schema; } @@ -67,12 +67,12 @@ class DeltaLakeMetadata final : public IDataLakeMetadata ContextPtr context) const override; private: - mutable Strings data_files; + mutable DataFileInfos data_files; NamesAndTypesList schema; DeltaLakePartitionColumns partition_columns; ObjectStoragePtr object_storage; - Strings getDataFiles(const ActionsDAG *) const { return data_files; } + DataFileInfos getDataFiles(const ActionsDAG *) const { return data_files; } }; } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp index e54bc89e3a39..838ec22b4547 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp @@ -35,7 +35,7 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &) return table_snapshot->update(); } -Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const +DataFileInfos DeltaLakeMetadataDeltaKernel::getDataFiles() const { throwNotImplemented("getDataFiles()"); } diff --git a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h index cfc57b791040..3bee4a028414 100644 --- a/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h +++ b/src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h @@ -33,7 +33,7 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata bool update(const ContextPtr & context) override; - Strings getDataFiles() const override; + DataFileInfos getDataFiles() const override; NamesAndTypesList getTableSchema() const override; diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp index 6398ad34a4b6..a1f97a00038a 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp @@ -40,7 +40,7 @@ namespace ErrorCodes * hoodie.parquet.max.file.size option. Once a single Parquet file is too large, Hudi creates a second file group. * Each file group is identified by File Id. */ -Strings HudiMetadata::getDataFilesImpl() const +DataFileInfos HudiMetadata::getDataFilesImpl() const { auto configuration_ptr = configuration.lock(); auto log = getLogger("HudiMetadata"); @@ -76,12 +76,12 @@ Strings HudiMetadata::getDataFilesImpl() const } } - Strings result; + DataFileInfos result; for (auto & [partition, partition_data] : files) { LOG_TRACE(log, "Adding {} data files from partition {}", partition, partition_data.size()); for (auto & [file_id, file_data] : partition_data) - result.push_back(std::move(file_data.key)); + result.push_back(DataFileInfo(std::move(file_data.key))); } return result; } @@ -91,7 +91,7 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv { } -Strings HudiMetadata::getDataFiles() const +DataFileInfos HudiMetadata::getDataFiles() const { if (data_files.empty()) data_files = getDataFilesImpl(); diff --git a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h index 2c23269b928a..589f3906d7f5 100644 --- a/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/HudiMetadata.h @@ -19,7 +19,7 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_); - Strings getDataFiles() const override; + DataFileInfos getDataFiles() const override; NamesAndTypesList getTableSchema() const override { return {}; } @@ -49,9 +49,9 @@ class HudiMetadata final : public IDataLakeMetadata, private WithContext private: const ObjectStoragePtr object_storage; const ConfigurationObserverPtr configuration; - mutable Strings data_files; + mutable DataFileInfos data_files; - Strings getDataFilesImpl() const; + DataFileInfos getDataFilesImpl() const; }; } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index df4f5ed3a45b..a4bbd242e2b4 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -1,5 +1,8 @@ #include "IDataLakeMetadata.h" #include +#include + +#include namespace DB { @@ -11,7 +14,7 @@ class KeysIterator : public IObjectIterator { public: KeysIterator( - Strings && data_files_, + DataFileInfos && data_files_, ObjectStoragePtr object_storage_, IDataLakeMetadata::FileProgressCallback callback_) : data_files(data_files_) @@ -33,7 +36,7 @@ class KeysIterator : public IObjectIterator if (current_index >= data_files.size()) return nullptr; - auto key = data_files[current_index]; + auto key = data_files[current_index].file_path; if (callback) { @@ -43,12 +46,14 @@ class KeysIterator : public IObjectIterator callback(FileProgress(0, 1)); } - return std::make_shared(key, std::nullopt); + auto result = std::make_shared(key, std::nullopt); + result->setFileMetaInfo(data_files[current_index].file_meta_info); + return result; } } private: - Strings data_files; + DataFileInfos data_files; ObjectStoragePtr object_storage; std::atomic index = 0; IDataLakeMetadata::FileProgressCallback callback; @@ -57,7 +62,7 @@ class KeysIterator : public IObjectIterator } ObjectIterator IDataLakeMetadata::createKeysIterator( - Strings && data_files_, + DataFileInfos && data_files_, ObjectStoragePtr object_storage_, IDataLakeMetadata::FileProgressCallback callback_) const { @@ -73,4 +78,88 @@ DB::ReadFromFormatInfo IDataLakeMetadata::prepareReadingFromFormat( return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, context, supports_subset_of_columns); } +DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info) +{ + if (!file_info) + return; + + auto log = getLogger("DataFileMetaInfo"); + + if (file_info->has("columns")) + { + auto columns = file_info->getArray("columns"); + for (size_t i = 0; i < columns->size(); ++i) + { + auto column = columns->getObject(static_cast(i)); + + Int32 id; + if (column->has("id")) + id = column->get("id"); + else + { + LOG_WARNING(log, "Can't read column id, ignored"); + continue; + } + + DB::DataFileMetaInfo::ColumnInfo column_info; + if (column->has("rows")) + column_info.rows_count = column->get("rows"); + if (column->has("nulls")) + column_info.nulls_count = column->get("nulls"); + if (column->has("range")) + { + Range range(""); + std::string r = column->get("range"); + try + { + range.restoreFromDump(r); + column_info.hyperrectangle = std::move(range); + } + catch (const Exception & e) + { + LOG_WARNING(log, "Can't read range for column {}, range '{}' ignored, error: {}", id, r, e.what()); + } + } + + columns_info[id] = column_info; + } + } +} + +DataFileMetaInfo::DataFileMetaInfo(const std::unordered_map & columns_info_) +{ + for (const auto & column : columns_info_) + { + columns_info[column.first] = {column.second.rows_count, column.second.nulls_count, column.second.hyperrectangle}; + } +} + +Poco::JSON::Object::Ptr DataFileMetaInfo::toJson() const +{ + Poco::JSON::Object::Ptr file_info = new Poco::JSON::Object(); + + if (!columns_info.empty()) + { + Poco::JSON::Array::Ptr columns = new Poco::JSON::Array(); + + for (const auto & column : columns_info) + { + Poco::JSON::Object::Ptr column_info = new Poco::JSON::Object(); + column_info->set("id", column.first); + if (column.second.rows_count.has_value()) + column_info->set("rows", column.second.rows_count.value()); + if (column.second.nulls_count.has_value()) + column_info->set("nulls", column.second.nulls_count.value()); + if (column.second.hyperrectangle.has_value()) + column_info->set("range", column.second.hyperrectangle.value().dump()); + + columns->add(column_info); + } + + file_info->set("columns", columns); + } + + return file_info; +} + } diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 049a1fa8d60a..a34fea172ab6 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -1,10 +1,19 @@ #pragma once #include #include +#include #include -#include "Interpreters/ActionsDAG.h" +#include #include #include +#include + +namespace Iceberg +{ + +struct ColumnInfo; + +}; namespace DB { @@ -12,7 +21,52 @@ namespace DB namespace ErrorCodes { extern const int UNSUPPORTED_METHOD; -} +}; + +class DataFileMetaInfo +{ +public: + DataFileMetaInfo() = default; + + // Extract metadata from Iceberg structure + explicit DataFileMetaInfo(const std::unordered_map & columns_info_); + + // Deserialize from json in distributed requests + explicit DataFileMetaInfo(const Poco::JSON::Object::Ptr file_info); + + // Serialize to json in distributed requests + Poco::JSON::Object::Ptr toJson() const; + + struct ColumnInfo + { + std::optional rows_count; + std::optional nulls_count; + std::optional hyperrectangle; + }; + + std::unordered_map columns_info; +}; + +using DataFileMetaInfoPtr = std::shared_ptr; + +struct DataFileInfo +{ + std::string file_path; + std::optional file_meta_info; + + explicit DataFileInfo(const std::string & file_path_) + : file_path(file_path_) {} + + explicit DataFileInfo(std::string && file_path_) + : file_path(std::move(file_path_)) {} + + bool operator==(const DataFileInfo & rhs) const + { + return file_path == rhs.file_path; + } +}; + +using DataFileInfos = std::vector; class IDataLakeMetadata : boost::noncopyable { @@ -23,7 +77,7 @@ class IDataLakeMetadata : boost::noncopyable /// List all data files. /// For better parallelization, iterate() method should be used. - virtual Strings getDataFiles() const = 0; + virtual DataFileInfos getDataFiles() const = 0; /// Return iterator to `data files`. using FileProgressCallback = std::function; virtual ObjectIterator iterate( @@ -62,7 +116,7 @@ class IDataLakeMetadata : boost::noncopyable protected: ObjectIterator createKeysIterator( - Strings && data_files_, + DataFileInfos && data_files_, ObjectStoragePtr object_storage_, IDataLakeMetadata::FileProgressCallback callback_) const; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index b8b26e9765d2..eba96a2bb4a9 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -896,7 +896,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const return create_fn(); } -Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const +DataFileInfos IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const { bool use_partition_pruning = filter_dag && local_context->getSettingsRef()[Setting::use_iceberg_partition_pruning]; @@ -906,7 +906,7 @@ Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, Context return cached_unprunned_files_for_last_processed_snapshot.value(); } - Strings data_files; + DataFileInfos data_files; { SharedLockGuard lock(mutex); @@ -928,7 +928,10 @@ Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, Context if (!pruner.canBePruned(manifest_file_entry)) { if (std::holds_alternative(manifest_file_entry.file)) - data_files.push_back(std::get(manifest_file_entry.file).file_name); + { + data_files.push_back(DataFileInfo(std::get(manifest_file_entry.file).file_name)); + data_files.back().file_meta_info = std::make_shared(manifest_file_entry.columns_infos); + } } } } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index 0a6d16112625..f782f529f566 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -48,7 +48,7 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext /// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files. /// All subsequent calls when the same data snapshot is relevant will return saved list of files (because it cannot be changed /// without changing metadata file). Drops on every snapshot update. - Strings getDataFiles() const override { return getDataFilesImpl(nullptr, getContext()); } + DataFileInfos getDataFiles() const override { return getDataFilesImpl(nullptr, getContext()); } /// Get table schema parsed from metadata. NamesAndTypesList getTableSchema() const override; @@ -112,11 +112,11 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext Int64 relevant_snapshot_id TSA_GUARDED_BY(mutex) {-1}; const String table_location; - mutable std::optional cached_unprunned_files_for_last_processed_snapshot TSA_GUARDED_BY(cached_unprunned_files_for_last_processed_snapshot_mutex); + mutable std::optional cached_unprunned_files_for_last_processed_snapshot TSA_GUARDED_BY(cached_unprunned_files_for_last_processed_snapshot_mutex); mutable std::mutex cached_unprunned_files_for_last_processed_snapshot_mutex; void updateState(const ContextPtr & local_context, Poco::JSON::Object::Ptr metadata_object, bool metadata_file_changed) TSA_REQUIRES(mutex); - Strings getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const; + DataFileInfos getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const; void updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object) TSA_REQUIRES(mutex); ManifestFileCacheKeys getManifestList(ContextPtr local_context, const String & filename) const; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 2f21601a984a..df62062dfdb6 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -274,7 +274,6 @@ void StorageObjectStorageSource::lazyInitialize() Chunk StorageObjectStorageSource::generate() { - lazyInitialize(); while (true) @@ -331,6 +330,13 @@ Chunk StorageObjectStorageSource::generate() .etag = &(object_info->metadata->etag)}, read_context); + for (const auto & constant_column : reader.constant_columns_with_values) + { + chunk.addColumn(constant_column.first, + constant_column.second.name_and_type.type->createColumnConst( + chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst()); + } + if (chunk_size && chunk.hasColumns()) { const auto * object_with_partition_columns_info = dynamic_cast(object_info.get()); @@ -479,9 +485,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (!object_info) return {}; - if (object_info->getCommand().is_parsed()) + if (object_info->getCommand().isParsed()) { - auto retry_after_us = object_info->getCommand().get_retry_after_us(); + auto retry_after_us = object_info->getCommand().getRetryAfterUs(); if (retry_after_us.has_value()) { not_a_path = true; @@ -524,6 +530,81 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade return schema_cache->tryGetNumRows(cache_key, get_last_mod_time); }; + /// List of columns with constant value in current file, and values + std::map constant_columns_with_values; + std::unordered_set constant_columns; + + std::unordered_map> requested_columns_list; + { + size_t column_index = 0; + for (const auto & column : read_from_format_info.requested_columns) + requested_columns_list[column.getNameInStorage()] = std::make_pair(column_index++, column); + } + + std::unordered_map physical_columns_names; + Int32 column_counter = 0; + /// In Iceberg metadata columns' numbers starts from 1, so preincrement used + for (const auto & column : read_from_format_info.physical_columns) + physical_columns_names[++column_counter] = column.getNameInStorage(); + /// now column_counter contains maximum column index + + auto file_meta_data = object_info->getFileMetaInfo(); + if (file_meta_data.has_value()) + { + for (const auto & column : file_meta_data.value()->columns_info) + { + if (column.second.hyperrectangle.has_value()) + { + if (column.second.hyperrectangle.value().isPoint()) + { + auto column_id = column.first; + + if (column_id <= 0 || column_id > column_counter) + { /// Something wrong, ignore file metadata + LOG_WARNING(log, "Incorrect column ID: {}, ignoring file metadata", column_id); + constant_columns.clear(); + break; + } + + const auto & column_name = physical_columns_names[column_id]; + + auto i_column = requested_columns_list.find(column_name); + if (i_column == requested_columns_list.end()) + continue; + + /// isPoint() method checks that left==right + constant_columns_with_values[i_column->second.first] = + ConstColumnWithValue{ + i_column->second.second, + column.second.hyperrectangle.value().left + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column {} with value {}", + object_info->getPath(), column_name, column.second.hyperrectangle.value().left.dump()); + } + } + } + } + + NamesAndTypesList requested_columns_copy = read_from_format_info.requested_columns; + + if (!constant_columns.empty()) + { + size_t original_columns = requested_columns_copy.size(); + requested_columns_copy = requested_columns_copy.eraseNames(constant_columns); + if (requested_columns_copy.size() + constant_columns.size() != original_columns) + { + LOG_WARNING(log, "Can't remove constant columns for file {} correct, fallback to read. Founded constant columns: [{}]", + object_info->getPath(), constant_columns); + requested_columns_copy = read_from_format_info.requested_columns; + constant_columns.clear(); + constant_columns_with_values.clear(); + } + else if (requested_columns_copy.empty()) + need_only_count = true; + } + std::optional num_rows_from_cache = need_only_count && context_->getSettingsRef()[Setting::use_cache_for_count_from_files] ? try_get_num_rows_from_cache() : std::nullopt; @@ -564,7 +645,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade initial_header = sample_header; } - auto input_format = FormatFactory::instance().getInput( configuration->getFormat(), *read_buf, @@ -600,7 +680,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade }); } - if (read_from_format_info.columns_description.hasDefaults()) { builder.addSimpleTransform( @@ -617,7 +696,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade /// from chunk read by IInputFormat. builder.addSimpleTransform([&](const Block & header) { - return std::make_shared(header, read_from_format_info.requested_columns); + return std::make_shared(header, requested_columns_copy); }); auto pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder))); @@ -626,7 +705,12 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade ProfileEvents::increment(ProfileEvents::EngineFileLikeReadFiles); return ReaderHolder( - object_info, std::move(read_buf), std::move(source), std::move(pipeline), std::move(current_reader)); + object_info, + std::move(read_buf), + std::move(source), + std::move(pipeline), + std::move(current_reader), + std::move(constant_columns_with_values)); } std::future StorageObjectStorageSource::createReaderAsync() @@ -965,6 +1049,24 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne return object_infos[index++]; } +StorageObjectStorageSource::KeysIterator::KeysIterator( + const DataFileInfos & file_infos_, + ObjectStoragePtr object_storage_, + const NamesAndTypesList & virtual_columns_, + ObjectInfos * read_keys_, + bool ignore_non_existent_files_, + bool skip_object_metadata_, + std::function file_progress_callback_) + : object_storage(object_storage_) + , virtual_columns(virtual_columns_) + , file_progress_callback(file_progress_callback_) + , file_infos(file_infos_) + , ignore_non_existent_files(ignore_non_existent_files_) + , skip_object_metadata(skip_object_metadata_) +{ + fillKeys(read_keys_); +} + StorageObjectStorageSource::KeysIterator::KeysIterator( const Strings & keys_, ObjectStoragePtr object_storage_, @@ -976,16 +1078,21 @@ StorageObjectStorageSource::KeysIterator::KeysIterator( : object_storage(object_storage_) , virtual_columns(virtual_columns_) , file_progress_callback(file_progress_callback_) - , keys(keys_) + , file_infos(keys_.begin(), keys_.end()) , ignore_non_existent_files(ignore_non_existent_files_) , skip_object_metadata(skip_object_metadata_) +{ + fillKeys(read_keys_); +} + +void StorageObjectStorageSource::KeysIterator::fillKeys(ObjectInfos * read_keys_) { if (read_keys_) { /// TODO: should we add metadata if we anyway fetch it if file_progress_callback is passed? - for (auto && key : keys) + for (auto && file_info : file_infos) { - auto object_info = std::make_shared(key); + auto object_info = std::make_shared(file_info.file_path); read_keys_->emplace_back(object_info); } } @@ -996,29 +1103,29 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::KeysIterator::ne while (true) { size_t current_index = index.fetch_add(1, std::memory_order_relaxed); - if (current_index >= keys.size()) + if (current_index >= file_infos.size()) return nullptr; - auto key = keys[current_index]; + auto file_info = file_infos[current_index]; ObjectMetadata object_metadata{}; if (!skip_object_metadata) { if (ignore_non_existent_files) { - auto metadata = object_storage->tryGetObjectMetadata(key); + auto metadata = object_storage->tryGetObjectMetadata(file_info.file_path); if (!metadata) continue; object_metadata = *metadata; } else - object_metadata = object_storage->getObjectMetadata(key); + object_metadata = object_storage->getObjectMetadata(file_info.file_path); } if (file_progress_callback) file_progress_callback(FileProgress(0, object_metadata.size_bytes)); - return std::make_shared(key, object_metadata); + return std::make_shared(file_info, object_metadata); } } @@ -1027,12 +1134,14 @@ StorageObjectStorageSource::ReaderHolder::ReaderHolder( std::unique_ptr read_buf_, std::shared_ptr source_, std::unique_ptr pipeline_, - std::unique_ptr reader_) + std::unique_ptr reader_, + std::map && constant_columns_with_values_) : object_info(std::move(object_info_)) , read_buf(std::move(read_buf_)) , source(std::move(source_)) , pipeline(std::move(pipeline_)) , reader(std::move(reader_)) + , constant_columns_with_values(std::move(constant_columns_with_values_)) { } @@ -1046,6 +1155,7 @@ StorageObjectStorageSource::ReaderHolder::operator=(ReaderHolder && other) noexc source = std::move(other.source); read_buf = std::move(other.read_buf); object_info = std::move(other.object_info); + constant_columns_with_values = std::move(other.constant_columns_with_values); return *this; } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 6b08b02f9245..ae50c3bb3bda 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -96,6 +96,12 @@ class StorageObjectStorageSource : public SourceWithKeyCondition size_t total_rows_in_file = 0; LoggerPtr log = getLogger("StorageObjectStorageSource"); + struct ConstColumnWithValue + { + NameAndTypePair name_and_type; + Field value; + }; + struct ReaderHolder : private boost::noncopyable { public: @@ -104,7 +110,8 @@ class StorageObjectStorageSource : public SourceWithKeyCondition std::unique_ptr read_buf_, std::shared_ptr source_, std::unique_ptr pipeline_, - std::unique_ptr reader_); + std::unique_ptr reader_, + std::map && constant_columns_with_values_); ReaderHolder() = default; ReaderHolder(ReaderHolder && other) noexcept { *this = std::move(other); } @@ -123,6 +130,9 @@ class StorageObjectStorageSource : public SourceWithKeyCondition std::shared_ptr source; std::unique_ptr pipeline; std::unique_ptr reader; + + public: + std::map constant_columns_with_values; }; ReaderHolder reader; @@ -240,6 +250,15 @@ class StorageObjectStorageSource::GlobIterator : public IObjectIterator, WithCon class StorageObjectStorageSource::KeysIterator : public IObjectIterator { public: + KeysIterator( + const DataFileInfos & file_infos_, + ObjectStoragePtr object_storage_, + const NamesAndTypesList & virtual_columns_, + ObjectInfos * read_keys_, + bool ignore_non_existent_files_, + bool skip_object_metadata_, + std::function file_progress_callback = {}); + KeysIterator( const Strings & keys_, ObjectStoragePtr object_storage_, @@ -253,13 +272,15 @@ class StorageObjectStorageSource::KeysIterator : public IObjectIterator ObjectInfoPtr next(size_t processor) override; - size_t estimatedKeysCount() override { return keys.size(); } + size_t estimatedKeysCount() override { return file_infos.size(); } private: + void fillKeys(ObjectInfos * read_keys_); + const ObjectStoragePtr object_storage; const NamesAndTypesList virtual_columns; const std::function file_progress_callback; - const std::vector keys; + const DataFileInfos file_infos; std::atomic index = 0; bool ignore_non_existent_files; bool skip_object_metadata; diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp index 78652fd803e3..a08b7f8c91b1 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -170,6 +170,15 @@ std::optional StorageObjectStorageStableTaskDistributor::getMatchingFile file_path = object_info->getPath(); } + auto file_meta_info = object_info->getFileMetaInfo(); + if (file_meta_info.has_value()) + { + RelativePathWithMetadata::CommandInTaskResponse response; + response.setFilePath(file_path); + response.setFileMetaInfo(file_meta_info.value()); + file_path = response.toString(); + } + size_t file_replica_idx = getReplicaForFile(file_path); if (file_replica_idx == number_of_current_replica) { @@ -242,8 +251,8 @@ std::optional StorageObjectStorageStableTaskDistributor::getAnyUnprocess /// All unprocessed files owned by alive replicas with recenlty activity /// Need to retry after (oldest_activity - activity_limit) microseconds RelativePathWithMetadata::CommandInTaskResponse response; - response.set_retry_after_us(oldest_activity - activity_limit); - return response.to_string(); + response.setRetryAfterUs(oldest_activity - activity_limit); + return response.toString(); } return std::nullopt; diff --git a/src/Storages/prepareReadingFromFormat.cpp b/src/Storages/prepareReadingFromFormat.cpp index 5a99021221af..5033eed04c2d 100644 --- a/src/Storages/prepareReadingFromFormat.cpp +++ b/src/Storages/prepareReadingFromFormat.cpp @@ -88,6 +88,7 @@ ReadFromFormatInfo prepareReadingFromFormat( /// Create header for InputFormat with columns that will be read from the data. info.format_header = storage_snapshot->getSampleBlockForColumns(info.columns_description.getNamesOfPhysical()); info.serialization_hints = getSerializationHintsForFileLikeStorage(storage_snapshot->metadata, context); + info.physical_columns = storage_snapshot->metadata->getColumns().getAllPhysical(); return info; } diff --git a/src/Storages/prepareReadingFromFormat.h b/src/Storages/prepareReadingFromFormat.h index a5009eff89f2..45383d6fe3b5 100644 --- a/src/Storages/prepareReadingFromFormat.h +++ b/src/Storages/prepareReadingFromFormat.h @@ -26,6 +26,8 @@ namespace DB SerializationInfoByName serialization_hints; /// The list of hive partition columns. It shall be read from the path regardless if it is present in the file NamesAndTypesList hive_partition_columns_to_read_from_file_path; + /// The list of all physical columns is source. Required sometimes for some read optimization. + NamesAndTypesList physical_columns; }; struct PrepareReadingFromFormatHiveParams From de7545d31b2edb50b712e746199001eacd30a687 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 17 Sep 2025 15:57:29 +0200 Subject: [PATCH 2/9] Better range serialization --- src/Core/Range.cpp | 38 +++++-------------- src/Core/Range.h | 4 +- .../DataLakes/IDataLakeMetadata.cpp | 4 +- 3 files changed, 14 insertions(+), 32 deletions(-) diff --git a/src/Core/Range.cpp b/src/Core/Range.cpp index d15b7df88aaa..2f941d2e5840 100644 --- a/src/Core/Range.cpp +++ b/src/Core/Range.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -345,46 +346,27 @@ String Range::toString() const return str.str(); } -String Range::dump() const +String Range::serialize() const { WriteBufferFromOwnString str; - str << (left_included ? '[' : '(') << left.dump() << ","; - str << right.dump() << (right_included ? ']' : ')'); + str << left_included << right_included; + writeFieldBinary(left, str); + writeFieldBinary(right, str); return str.str(); } -void Range::restoreFromDump(const String & range) +void Range::deserialize(const String & range) { if (range.empty()) throw Exception(ErrorCodes::INCORRECT_DATA, "Empty range dump"); - if (range[0] == '[') - left_included = true; - else if (range[0] == '(') - left_included = false; - else - throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect range: {}", range); - - if (range[range.size() - 1] == ']') - right_included = true; - else if (range[range.size() - 1] == ')') - right_included = false; - else - throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect range: {}", range); - - /// TODO: Strings with comma - auto separator = range.find(','); - if (separator == std::string::npos || separator == range.size()) - throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect range: {}", range); - - std::string_view l(range.data() + 1, separator - 1); - std::string_view r(range.data() + separator + 1, range.size() - separator - 2); + ReadBufferFromOwnString str(range); - /// TODO: "Decimal64_'1596962100.000000'" can't be parsed by some reason - left = Field::restoreFromDump(std::string_view(range.data() + 1, separator - 1)); - right = Field::restoreFromDump(std::string_view(range.data() + separator + 1, range.size() - separator - 2)); + str >> left_included >> right_included; + left = readFieldBinary(str); + right = readFieldBinary(str); } Hyperrectangle intersect(const Hyperrectangle & a, const Hyperrectangle & b) diff --git a/src/Core/Range.h b/src/Core/Range.h index b96253b20e85..12463d448945 100644 --- a/src/Core/Range.h +++ b/src/Core/Range.h @@ -117,8 +117,8 @@ struct Range String toString() const; - String dump() const; - void restoreFromDump(const String & range); + String serialize() const; + void deserialize(const String & range); }; Range intersect(const Range & a, const Range & b); diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index a4bbd242e2b4..2076d75f6fc7 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -112,7 +112,7 @@ DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info) std::string r = column->get("range"); try { - range.restoreFromDump(r); + range.deserialize(r); column_info.hyperrectangle = std::move(range); } catch (const Exception & e) @@ -151,7 +151,7 @@ Poco::JSON::Object::Ptr DataFileMetaInfo::toJson() const if (column.second.nulls_count.has_value()) column_info->set("nulls", column.second.nulls_count.value()); if (column.second.hyperrectangle.has_value()) - column_info->set("range", column.second.hyperrectangle.value().dump()); + column_info->set("range", column.second.hyperrectangle.value().serialize()); columns->add(column_info); } From dd9809407f25055bd764b0e017e69dc2cb0a10fe Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 17 Sep 2025 16:40:39 +0200 Subject: [PATCH 3/9] Setting allow_experimental_iceberg_read_optimization, 0 by default --- src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 1 + .../DataLakes/Iceberg/IcebergMetadata.cpp | 5 +- .../StorageObjectStorageSource.cpp | 97 ++++++++++--------- 4 files changed, 61 insertions(+), 45 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 44141270985f..6b7695b254a8 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6905,6 +6905,9 @@ Allow retries in cluster request, when one node goes offline )", EXPERIMENTAL) \ DECLARE(Bool, object_storage_remote_initiator, false, R"( Execute request to object storage as remote on one of object_storage_cluster nodes. +)", EXPERIMENTAL) \ + DECLARE(Bool, allow_experimental_iceberg_read_optimization, false, R"( +Allow Iceberg read optimization based on Iceberg metadata. )", EXPERIMENTAL) \ \ /** Experimental timeSeries* aggregate functions. */ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 8dbfaff5c02f..8ec6e7b9e92c 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -79,6 +79,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, + {"allow_experimental_iceberg_read_optimization", false, false, "New setting."} }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index f8d25f162211..4ccc01a36907 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -60,6 +60,7 @@ extern const SettingsInt64 iceberg_timestamp_ms; extern const SettingsInt64 iceberg_snapshot_id; extern const SettingsBool use_iceberg_metadata_files_cache; extern const SettingsBool use_iceberg_partition_pruning; +extern const SettingsBool allow_experimental_iceberg_read_optimization; } @@ -1091,6 +1092,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(ContextPtr local_context, const DataFileInfos IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, ContextPtr local_context) const { bool use_partition_pruning = filter_dag && local_context->getSettingsRef()[Setting::use_iceberg_partition_pruning]; + bool use_iceberg_read_optimization = local_context->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]; { std::lock_guard cache_lock(cached_unprunned_files_for_last_processed_snapshot_mutex); @@ -1122,7 +1124,8 @@ DataFileInfos IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, C if (std::holds_alternative(manifest_file_entry.file)) { data_files.push_back(DataFileInfo(std::get(manifest_file_entry.file).file_name)); - data_files.back().file_meta_info = std::make_shared(manifest_file_entry.columns_infos); + if (use_iceberg_read_optimization) + data_files.back().file_meta_info = std::make_shared(manifest_file_entry.columns_infos); } } } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 47bf895687c1..787191695990 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -66,6 +66,7 @@ namespace Setting extern const SettingsBool cluster_function_process_archive_on_multiple_nodes; extern const SettingsBool table_engine_read_through_distributed_cache; extern const SettingsBool use_object_storage_list_objects_cache; + extern const SettingsBool allow_experimental_iceberg_read_optimization; } namespace ErrorCodes @@ -279,6 +280,8 @@ Chunk StorageObjectStorageSource::generate() { lazyInitialize(); + bool use_iceberg_read_optimization = read_context->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]; + while (true) { if (isCancelled() || !reader) @@ -333,11 +336,14 @@ Chunk StorageObjectStorageSource::generate() .etag = &(object_info->metadata->etag)}, read_context); - for (const auto & constant_column : reader.constant_columns_with_values) + if (use_iceberg_read_optimization) { - chunk.addColumn(constant_column.first, - constant_column.second.name_and_type.type->createColumnConst( - chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst()); + for (const auto & constant_column : reader.constant_columns_with_values) + { + chunk.addColumn(constant_column.first, + constant_column.second.name_and_type.type->createColumnConst( + chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst()); + } } if (chunk_size && chunk.hasColumns()) @@ -551,61 +557,64 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade physical_columns_names[++column_counter] = column.getNameInStorage(); /// now column_counter contains maximum column index - auto file_meta_data = object_info->getFileMetaInfo(); - if (file_meta_data.has_value()) + NamesAndTypesList requested_columns_copy = read_from_format_info.requested_columns; + + if (context_->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]) { - for (const auto & column : file_meta_data.value()->columns_info) + auto file_meta_data = object_info->getFileMetaInfo(); + if (file_meta_data.has_value()) { - if (column.second.hyperrectangle.has_value()) + for (const auto & column : file_meta_data.value()->columns_info) { - if (column.second.hyperrectangle.value().isPoint()) + if (column.second.hyperrectangle.has_value()) { - auto column_id = column.first; + if (column.second.hyperrectangle.value().isPoint()) + { + auto column_id = column.first; - if (column_id <= 0 || column_id > column_counter) - { /// Something wrong, ignore file metadata - LOG_WARNING(log, "Incorrect column ID: {}, ignoring file metadata", column_id); - constant_columns.clear(); - break; - } + if (column_id <= 0 || column_id > column_counter) + { /// Something wrong, ignore file metadata + LOG_WARNING(log, "Incorrect column ID: {}, ignoring file metadata", column_id); + constant_columns.clear(); + break; + } - const auto & column_name = physical_columns_names[column_id]; + const auto & column_name = physical_columns_names[column_id]; - auto i_column = requested_columns_list.find(column_name); - if (i_column == requested_columns_list.end()) - continue; + auto i_column = requested_columns_list.find(column_name); + if (i_column == requested_columns_list.end()) + continue; - /// isPoint() method checks that left==right - constant_columns_with_values[i_column->second.first] = - ConstColumnWithValue{ - i_column->second.second, - column.second.hyperrectangle.value().left - }; - constant_columns.insert(column_name); + /// isPoint() method checks that left==right + constant_columns_with_values[i_column->second.first] = + ConstColumnWithValue{ + i_column->second.second, + column.second.hyperrectangle.value().left + }; + constant_columns.insert(column_name); - LOG_DEBUG(log, "In file {} constant column {} with value {}", - object_info->getPath(), column_name, column.second.hyperrectangle.value().left.dump()); + LOG_DEBUG(log, "In file {} constant column {} with value {}", + object_info->getPath(), column_name, column.second.hyperrectangle.value().left.dump()); + } } } } - } - - NamesAndTypesList requested_columns_copy = read_from_format_info.requested_columns; - if (!constant_columns.empty()) - { - size_t original_columns = requested_columns_copy.size(); - requested_columns_copy = requested_columns_copy.eraseNames(constant_columns); - if (requested_columns_copy.size() + constant_columns.size() != original_columns) + if (!constant_columns.empty()) { - LOG_WARNING(log, "Can't remove constant columns for file {} correct, fallback to read. Founded constant columns: [{}]", - object_info->getPath(), constant_columns); - requested_columns_copy = read_from_format_info.requested_columns; - constant_columns.clear(); - constant_columns_with_values.clear(); + size_t original_columns = requested_columns_copy.size(); + requested_columns_copy = requested_columns_copy.eraseNames(constant_columns); + if (requested_columns_copy.size() + constant_columns.size() != original_columns) + { + LOG_WARNING(log, "Can't remove constant columns for file {} correct, fallback to read. Founded constant columns: [{}]", + object_info->getPath(), constant_columns); + requested_columns_copy = read_from_format_info.requested_columns; + constant_columns.clear(); + constant_columns_with_values.clear(); + } + else if (requested_columns_copy.empty()) + need_only_count = true; } - else if (requested_columns_copy.empty()) - need_only_count = true; } std::optional num_rows_from_cache From 8fb2aa232b279b7c497ea972681d8af9b77b1148 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 18 Sep 2025 14:58:54 +0200 Subject: [PATCH 4/9] Fix column indexes --- .../DataLakes/IDataLakeMetadata.cpp | 36 +++++-- .../DataLakes/IDataLakeMetadata.h | 9 +- .../DataLakes/Iceberg/IcebergMetadata.cpp | 5 +- .../StorageObjectStorageSource.cpp | 96 +++++++++---------- src/Storages/prepareReadingFromFormat.cpp | 1 - src/Storages/prepareReadingFromFormat.h | 2 - 6 files changed, 82 insertions(+), 67 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index 2076d75f6fc7..7cdcabbb2076 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -92,12 +92,12 @@ DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info) { auto column = columns->getObject(static_cast(i)); - Int32 id; - if (column->has("id")) - id = column->get("id"); + std::string name; + if (column->has("name")) + name = column->get("name").toString(); else { - LOG_WARNING(log, "Can't read column id, ignored"); + LOG_WARNING(log, "Can't read column name, ignored"); continue; } @@ -117,20 +117,38 @@ DataFileMetaInfo::DataFileMetaInfo(Poco::JSON::Object::Ptr file_info) } catch (const Exception & e) { - LOG_WARNING(log, "Can't read range for column {}, range '{}' ignored, error: {}", id, r, e.what()); + LOG_WARNING(log, "Can't read range for column {}, range '{}' ignored, error: {}", name, r, e.what()); } } - columns_info[id] = column_info; + columns_info[name] = column_info; } } } -DataFileMetaInfo::DataFileMetaInfo(const std::unordered_map & columns_info_) +DataFileMetaInfo::DataFileMetaInfo( + const IcebergSchemaProcessor & schema_processor, + Int32 schema_id, + const std::unordered_map & columns_info_) { + std::vector column_ids; for (const auto & column : columns_info_) + column_ids.push_back(column.first); + auto name_and_types = schema_processor.tryGetFieldsCharacteristics(schema_id, column_ids); + std::unordered_map name_by_index; + for (const auto & name_and_type : name_and_types) { - columns_info[column.first] = {column.second.rows_count, column.second.nulls_count, column.second.hyperrectangle}; + const auto name = name_and_type.getNameInStorage(); + auto index = schema_processor.tryGetColumnIDByName(schema_id, name); + if (index.has_value()) + name_by_index[index.value()] = name; + } + + for (const auto & column : columns_info_) + { + auto i_name = name_by_index.find(column.first); + if (i_name != name_by_index.end()) + columns_info[i_name->second] = {column.second.rows_count, column.second.nulls_count, column.second.hyperrectangle}; } } @@ -145,7 +163,7 @@ Poco::JSON::Object::Ptr DataFileMetaInfo::toJson() const for (const auto & column : columns_info) { Poco::JSON::Object::Ptr column_info = new Poco::JSON::Object(); - column_info->set("id", column.first); + column_info->set("name", column.first); if (column.second.rows_count.has_value()) column_info->set("rows", column.second.rows_count.value()); if (column.second.nulls_count.has_value()) diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 5db8850ed5d4..c4228f57cc98 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -8,6 +8,8 @@ #include #include +#include + namespace Iceberg { @@ -29,7 +31,10 @@ class DataFileMetaInfo DataFileMetaInfo() = default; // Extract metadata from Iceberg structure - explicit DataFileMetaInfo(const std::unordered_map & columns_info_); + explicit DataFileMetaInfo( + const IcebergSchemaProcessor & schema_processor, + Int32 schema_id, + const std::unordered_map & columns_info_); // Deserialize from json in distributed requests explicit DataFileMetaInfo(const Poco::JSON::Object::Ptr file_info); @@ -44,7 +49,7 @@ class DataFileMetaInfo std::optional hyperrectangle; }; - std::unordered_map columns_info; + std::unordered_map columns_info; }; using DataFileMetaInfoPtr = std::shared_ptr; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 4ccc01a36907..0e83d0547bfb 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -1125,7 +1125,10 @@ DataFileInfos IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag, C { data_files.push_back(DataFileInfo(std::get(manifest_file_entry.file).file_name)); if (use_iceberg_read_optimization) - data_files.back().file_meta_info = std::make_shared(manifest_file_entry.columns_infos); + data_files.back().file_meta_info = std::make_shared( + schema_processor, + relevant_snapshot_schema_id, + manifest_file_entry.columns_infos); } } } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 787191695990..3e4a0bf39c73 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -340,6 +340,11 @@ Chunk StorageObjectStorageSource::generate() { for (const auto & constant_column : reader.constant_columns_with_values) { + LOG_DEBUG(log, "Restore constant column '{}' index {} with value '{}'", + constant_column.second.name_and_type.name, + constant_column.first, + constant_column.second.value + ); chunk.addColumn(constant_column.first, constant_column.second.name_and_type.type->createColumnConst( chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst()); @@ -543,77 +548,64 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade std::map constant_columns_with_values; std::unordered_set constant_columns; + NamesAndTypesList requested_columns_copy = read_from_format_info.requested_columns; + std::unordered_map> requested_columns_list; { size_t column_index = 0; - for (const auto & column : read_from_format_info.requested_columns) + for (const auto & column : requested_columns_copy) requested_columns_list[column.getNameInStorage()] = std::make_pair(column_index++, column); } - std::unordered_map physical_columns_names; - Int32 column_counter = 0; - /// In Iceberg metadata columns' numbers starts from 1, so preincrement used - for (const auto & column : read_from_format_info.physical_columns) - physical_columns_names[++column_counter] = column.getNameInStorage(); - /// now column_counter contains maximum column index - - NamesAndTypesList requested_columns_copy = read_from_format_info.requested_columns; - if (context_->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]) { - auto file_meta_data = object_info->getFileMetaInfo(); - if (file_meta_data.has_value()) + auto schema = configuration->tryGetTableStructureFromMetadata(); + if (schema.has_value()) { - for (const auto & column : file_meta_data.value()->columns_info) + auto file_meta_data = object_info->getFileMetaInfo(); + if (file_meta_data.has_value()) { - if (column.second.hyperrectangle.has_value()) + for (const auto & column : file_meta_data.value()->columns_info) { - if (column.second.hyperrectangle.value().isPoint()) + if (column.second.hyperrectangle.has_value()) { - auto column_id = column.first; - - if (column_id <= 0 || column_id > column_counter) - { /// Something wrong, ignore file metadata - LOG_WARNING(log, "Incorrect column ID: {}, ignoring file metadata", column_id); - constant_columns.clear(); - break; + if (column.second.hyperrectangle.value().isPoint()) + { + auto column_name = column.first; + + auto i_column = requested_columns_list.find(column_name); + if (i_column == requested_columns_list.end()) + continue; + + /// isPoint() method checks that left==right + constant_columns_with_values[i_column->second.first] = + ConstColumnWithValue{ + i_column->second.second, + column.second.hyperrectangle.value().left + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column '{}' id {} type '{}' with value '{}'", + object_info->getPath(), + column_name, + i_column->second.first, + i_column->second.second.type, + column.second.hyperrectangle.value().left.dump()); } - - const auto & column_name = physical_columns_names[column_id]; - - auto i_column = requested_columns_list.find(column_name); - if (i_column == requested_columns_list.end()) - continue; - - /// isPoint() method checks that left==right - constant_columns_with_values[i_column->second.first] = - ConstColumnWithValue{ - i_column->second.second, - column.second.hyperrectangle.value().left - }; - constant_columns.insert(column_name); - - LOG_DEBUG(log, "In file {} constant column {} with value {}", - object_info->getPath(), column_name, column.second.hyperrectangle.value().left.dump()); } } } - } - if (!constant_columns.empty()) - { - size_t original_columns = requested_columns_copy.size(); - requested_columns_copy = requested_columns_copy.eraseNames(constant_columns); - if (requested_columns_copy.size() + constant_columns.size() != original_columns) + if (!constant_columns.empty()) { - LOG_WARNING(log, "Can't remove constant columns for file {} correct, fallback to read. Founded constant columns: [{}]", - object_info->getPath(), constant_columns); - requested_columns_copy = read_from_format_info.requested_columns; - constant_columns.clear(); - constant_columns_with_values.clear(); + size_t original_columns = requested_columns_copy.size(); + requested_columns_copy = requested_columns_copy.eraseNames(constant_columns); + if (requested_columns_copy.size() + constant_columns.size() != original_columns) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't remove constant columns for file {} correct, fallback to read. Founded constant columns: [{}]", + object_info->getPath(), constant_columns); + if (requested_columns_copy.empty()) + need_only_count = true; } - else if (requested_columns_copy.empty()) - need_only_count = true; } } diff --git a/src/Storages/prepareReadingFromFormat.cpp b/src/Storages/prepareReadingFromFormat.cpp index 5033eed04c2d..5a99021221af 100644 --- a/src/Storages/prepareReadingFromFormat.cpp +++ b/src/Storages/prepareReadingFromFormat.cpp @@ -88,7 +88,6 @@ ReadFromFormatInfo prepareReadingFromFormat( /// Create header for InputFormat with columns that will be read from the data. info.format_header = storage_snapshot->getSampleBlockForColumns(info.columns_description.getNamesOfPhysical()); info.serialization_hints = getSerializationHintsForFileLikeStorage(storage_snapshot->metadata, context); - info.physical_columns = storage_snapshot->metadata->getColumns().getAllPhysical(); return info; } diff --git a/src/Storages/prepareReadingFromFormat.h b/src/Storages/prepareReadingFromFormat.h index 45383d6fe3b5..a5009eff89f2 100644 --- a/src/Storages/prepareReadingFromFormat.h +++ b/src/Storages/prepareReadingFromFormat.h @@ -26,8 +26,6 @@ namespace DB SerializationInfoByName serialization_hints; /// The list of hive partition columns. It shall be read from the path regardless if it is present in the file NamesAndTypesList hive_partition_columns_to_read_from_file_path; - /// The list of all physical columns is source. Required sometimes for some read optimization. - NamesAndTypesList physical_columns; }; struct PrepareReadingFromFormatHiveParams From bb2da6b0ad3d8876df24695ce49d4ccbe2dcf91e Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 18 Sep 2025 18:24:44 +0200 Subject: [PATCH 5/9] Optimization for NULLs, count form metadata, test --- src/Core/Settings.cpp | 2 +- src/Core/SettingsChangesHistory.cpp | 2 +- .../StorageObjectStorageSource.cpp | 146 ++++++++++--- .../integration/test_storage_iceberg/test.py | 205 ++++++++++++++++++ 4 files changed, 318 insertions(+), 37 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 6b7695b254a8..74f2f6ca4880 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -6906,7 +6906,7 @@ Allow retries in cluster request, when one node goes offline DECLARE(Bool, object_storage_remote_initiator, false, R"( Execute request to object storage as remote on one of object_storage_cluster nodes. )", EXPERIMENTAL) \ - DECLARE(Bool, allow_experimental_iceberg_read_optimization, false, R"( + DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"( Allow Iceberg read optimization based on Iceberg metadata. )", EXPERIMENTAL) \ \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 8ec6e7b9e92c..e9cfa0086847 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -79,7 +79,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, - {"allow_experimental_iceberg_read_optimization", false, false, "New setting."} + {"allow_experimental_iceberg_read_optimization", true, true, "New setting."} }); addSettingsChanges(settings_changes_history, "25.6", { diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 3e4a0bf39c73..9744cdf2e302 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -523,9 +523,17 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade QueryPipelineBuilder builder; std::shared_ptr source; std::unique_ptr read_buf; + std::optional rows_count_from_metadata; auto try_get_num_rows_from_cache = [&]() -> std::optional { + if (rows_count_from_metadata.has_value()) + { + /// Must be non negative here + size_t value = rows_count_from_metadata.value(); + return value; + } + if (!schema_cache) return std::nullopt; @@ -559,54 +567,122 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (context_->getSettingsRef()[Setting::allow_experimental_iceberg_read_optimization]) { - auto schema = configuration->tryGetTableStructureFromMetadata(); - if (schema.has_value()) + auto file_meta_data = object_info->getFileMetaInfo(); + if (file_meta_data.has_value()) { - auto file_meta_data = object_info->getFileMetaInfo(); - if (file_meta_data.has_value()) + bool is_all_rows_count_equals = true; + for (const auto & column : file_meta_data.value()->columns_info) { - for (const auto & column : file_meta_data.value()->columns_info) + if (is_all_rows_count_equals && column.second.rows_count.has_value()) { - if (column.second.hyperrectangle.has_value()) + if (rows_count_from_metadata.has_value()) { - if (column.second.hyperrectangle.value().isPoint()) + if (column.second.rows_count.value() != rows_count_from_metadata.value()) { - auto column_name = column.first; - - auto i_column = requested_columns_list.find(column_name); - if (i_column == requested_columns_list.end()) - continue; - - /// isPoint() method checks that left==right - constant_columns_with_values[i_column->second.first] = - ConstColumnWithValue{ - i_column->second.second, - column.second.hyperrectangle.value().left - }; - constant_columns.insert(column_name); - - LOG_DEBUG(log, "In file {} constant column '{}' id {} type '{}' with value '{}'", - object_info->getPath(), - column_name, - i_column->second.first, - i_column->second.second.type, - column.second.hyperrectangle.value().left.dump()); + LOG_WARNING(log, "Inconsistent rows count for file {} in metadats, ignored", object_info->getPath()); + is_all_rows_count_equals = false; + rows_count_from_metadata = std::nullopt; } } + else if (column.second.rows_count.value() < 0) + { + LOG_WARNING(log, "Negative rows count for file {} in metadats, ignored", object_info->getPath()); + is_all_rows_count_equals = false; + rows_count_from_metadata = std::nullopt; + } + else + rows_count_from_metadata = column.second.rows_count; + } + if (column.second.hyperrectangle.has_value()) + { + if (column.second.hyperrectangle.value().isPoint() && + (!column.second.nulls_count.has_value() || !column.second.nulls_count.value())) + { + auto column_name = column.first; + + auto i_column = requested_columns_list.find(column_name); + if (i_column == requested_columns_list.end()) + continue; + + /// isPoint() method checks that left==right + constant_columns_with_values[i_column->second.first] = + ConstColumnWithValue{ + i_column->second.second, + column.second.hyperrectangle.value().left + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column '{}' id {} type '{}' with value '{}'", + object_info->getPath(), + column_name, + i_column->second.first, + i_column->second.second.type, + column.second.hyperrectangle.value().left.dump()); + } + else if (column.second.rows_count.has_value() && column.second.nulls_count.has_value() + && column.second.rows_count.value() == column.second.nulls_count.value()) + { + auto column_name = column.first; + + auto i_column = requested_columns_list.find(column_name); + if (i_column == requested_columns_list.end()) + continue; + + if (!i_column->second.second.type->isNullable()) + continue; + + constant_columns_with_values[i_column->second.first] = + ConstColumnWithValue{ + i_column->second.second, + Field() + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column '{}' id {} type '{}' with value 'NULL'", + object_info->getPath(), + column_name, + i_column->second.first, + i_column->second.second.type); + } } } - if (!constant_columns.empty()) + for (const auto & column : requested_columns_list) { - size_t original_columns = requested_columns_copy.size(); - requested_columns_copy = requested_columns_copy.eraseNames(constant_columns); - if (requested_columns_copy.size() + constant_columns.size() != original_columns) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't remove constant columns for file {} correct, fallback to read. Founded constant columns: [{}]", - object_info->getPath(), constant_columns); - if (requested_columns_copy.empty()) - need_only_count = true; + const auto & column_name = column.first; + + if (file_meta_data.value()->columns_info.contains(column_name)) + continue; + + if (!column.second.second.type->isNullable()) + continue; + + /// Column is nullable and absent in file + constant_columns_with_values[column.second.first] = + ConstColumnWithValue{ + column.second.second, + Field() + }; + constant_columns.insert(column_name); + + LOG_DEBUG(log, "In file {} constant column '{}' id {} type '{}' with value 'NULL'", + object_info->getPath(), + column_name, + column.second.first, + column.second.second.type); } } + + if (!constant_columns.empty()) + { + size_t original_columns = requested_columns_copy.size(); + requested_columns_copy = requested_columns_copy.eraseNames(constant_columns); + if (requested_columns_copy.size() + constant_columns.size() != original_columns) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't remove constant columns for file {} correct, fallback to read. Founded constant columns: [{}]", + object_info->getPath(), constant_columns); + if (requested_columns_copy.empty()) + need_only_count = true; + } } std::optional num_rows_from_cache diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index d08ab8d6605e..3ed9c9ef2be5 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3071,6 +3071,7 @@ def check_validity_and_get_prunned_files(select_expression): == 1 ) + @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) def test_explicit_metadata_file(started_cluster, storage_type): instance = started_cluster.instances["node1"] @@ -3115,6 +3116,7 @@ def test_explicit_metadata_file(started_cluster, storage_type): with pytest.raises(Exception): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="../metadata/v11.metadata.json") + @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) @pytest.mark.parametrize("run_on_cluster", [False, True]) def test_minmax_pruning_with_null(started_cluster, storage_type, run_on_cluster): @@ -3341,6 +3343,7 @@ def execute_spark_query(query: str): run_on_cluster=True, ) + @pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) def test_minmax_pruning_for_arrays_and_maps_subfields_disabled(started_cluster, storage_type): instance = started_cluster.instances["node1"] @@ -3410,6 +3413,7 @@ def execute_spark_query(query: str): instance.query(f"SELECT * FROM {table_select_expression} ORDER BY ALL") + @pytest.mark.parametrize("storage_type", ["s3"]) def test_system_tables_partition_sorting_keys(started_cluster, storage_type): instance = started_cluster.instances["node1"] @@ -3455,6 +3459,7 @@ def test_system_tables_partition_sorting_keys(started_cluster, storage_type): assert res == '"bucket(16, id), day(ts)","iddescnulls last, hour(ts)ascnulls first"' + @pytest.mark.parametrize("storage_type", ["local", "s3"]) def test_compressed_metadata(started_cluster, storage_type): instance = started_cluster.instances["node1"] @@ -3492,3 +3497,203 @@ def test_compressed_metadata(started_cluster, storage_type): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="") assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n" + + +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_read_constant_columns_optimization(started_cluster, storage_type, run_on_cluster): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + TABLE_NAME = "test_read_constant_columns_optimization_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str): + return execute_spark_query_general( + spark, + started_cluster, + storage_type, + TABLE_NAME, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + date DATE, + date2 DATE, + name VARCHAR(50), + number BIGINT + ) + USING iceberg + PARTITIONED BY (identity(tag), years(date)) + OPTIONS('format-version'='2') + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, DATE '2024-01-20', DATE '2024-01-20', 'vasya', 5), + (1, DATE '2024-01-20', DATE '2024-01-20', 'vasilisa', 5), + (1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5), + (1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5), + (2, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa', 5), + (2, DATE '2025-01-21', DATE '2025-01-20', 'vasilisa', 5) + """ + ) + + execute_spark_query( + f""" + ALTER TABLE {TABLE_NAME} ALTER COLUMN number FIRST; + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (5, 3, DATE '2025-01-20', DATE '2024-01-20', 'vasilisa'), + (5, 3, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa') + """ + ) + + execute_spark_query( + f""" + ALTER TABLE {TABLE_NAME} + ADD COLUMNS ( + name2 string + ); + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (5, 4, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 4, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa', 'iceberg'), + (5, 5, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 5, DATE '2025-01-20', DATE '2024-01-20', 'vasilisa', 'icebreaker'), + (5, 6, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 6, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg') + """ + ) + + # Totally must be 7 files + # Partitioned column 'tag' is constant in each file + # Column 'date' is constant in 6 files, has different values in (2-2025) + # Column 'date2' is constant in 4 files (1-2024, 2-2025, 5-2025, 6-2025) + # Column 'name' is constant in 3 files (1-2025, 2-2025, 6-2025) + # Column 'number' is globally constant + # Column 'name2' is present only in 3 files (4-2025, 5-2025, 6-2025), constant in two (4-2025, 6-2025) + # Files 1-2025 and 6-2025 have only constant columns + + creation_expression = get_creation_expression( + storage_type, TABLE_NAME, started_cluster, table_function=True, run_on_cluster=run_on_cluster + ) + + # Warm up metadata cache + for replica in started_cluster.instances.values(): + replica.query(f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0") + + all_data_expected_query_id = str(uuid.uuid4()) + all_data_expected = instance.query( + f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=all_data_expected_query_id, + ) + const_only_expected_query_id = str(uuid.uuid4()) + const_only_expected = instance.query( + f"SELECT tag, number FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_only_expected_query_id, + ) + const_partial_expected_query_id = str(uuid.uuid4()) + const_partial_expected = instance.query( + f"SELECT tag, date2, number, name FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_partial_expected_query_id, + ) + const_partial2_expected_query_id = str(uuid.uuid4()) + const_partial2_expected = instance.query( + f"SELECT tag, date2, number, name2 FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_partial2_expected_query_id, + ) + count_expected_query_id = str(uuid.uuid4()) + count_expected = instance.query( + f"SELECT count(),tag FROM {creation_expression} GROUP BY ALL ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=count_expected_query_id, + ) + + all_data_query_id = str(uuid.uuid4()) + all_data_optimized = instance.query( + f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=all_data_query_id, + ) + const_only_query_id = str(uuid.uuid4()) + const_only_optimized = instance.query( + f"SELECT tag, number FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_only_query_id, + ) + const_partial_query_id = str(uuid.uuid4()) + const_partial_optimized = instance.query( + f"SELECT tag, date2, number, name FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_partial_query_id, + ) + const_partial2_query_id = str(uuid.uuid4()) + const_partial2_optimized = instance.query( + f"SELECT tag, date2, number, name2 FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_partial2_query_id, + ) + count_query_id = str(uuid.uuid4()) + count_optimized = instance.query( + f"SELECT count(),tag FROM {creation_expression} GROUP BY ALL ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=count_query_id, + ) + + assert all_data_expected == all_data_optimized + assert const_only_expected == const_only_optimized + assert const_partial_expected == const_partial_optimized + assert const_partial2_expected == const_partial2_optimized + assert count_expected == count_optimized + + for replica in started_cluster.instances.values(): + replica.query("SYSTEM FLUSH LOGS") + + def get_events(query_id, event): + res = instance.query( + f""" + SELECT + sum(tupleElement(arrayJoin(ProfileEvents),2)) as value + FROM + clusterAllReplicas('cluster_simple', system.query_log) + WHERE + type='QueryFinish' + AND tupleElement(arrayJoin(ProfileEvents),1)='{event}' + AND initial_query_id='{query_id}' + GROUP BY ALL + FORMAT CSV + """) + return int(res) + + event = "S3GetObject" if storage_type == "s3" else "AzureGetObject" + + events_all_data_expected = get_events(all_data_expected_query_id, event) + events_const_only_expected = get_events(const_only_expected_query_id, event) + events_const_partial_expected = get_events(const_partial_expected_query_id, event) + events_const_partial2_expected = get_events(const_partial2_expected_query_id, event) + events_count_expected = get_events(count_expected_query_id, event) + + # Without optimization clickhouse reads all 7 files + assert events_all_data_expected == 7 + assert events_const_only_expected == 7 + assert events_const_partial_expected == 7 + assert events_const_partial2_expected == 7 + assert events_count_expected == 7 + + events_all_data_optimized = get_events(all_data_query_id, event) # 1-2025, 6-2025 must not be read + events_const_only_optimized = get_events(const_only_query_id, event) # All must not be read + events_const_partial_optimized = get_events(const_partial_query_id, event) # 1-2025, 6-2025 and 2-2025 must not be read + events_const_partial2_optimized = get_events(const_partial2_query_id, event) # 1-2024, 1-2025, 2-2025 and 6-2025 must not be read + events_count_optimized = get_events(count_query_id, event) # All must not be read + + assert events_all_data_optimized == 5 + assert events_const_only_optimized == 0 + assert events_const_partial_optimized == 4 + assert events_const_partial2_optimized == 3 + assert events_count_optimized == 0 From f8caa7f8031406d90e6dd3d817628d638d0ef71c Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 18 Sep 2025 20:50:06 +0200 Subject: [PATCH 6/9] Remove optimiation for NULLs --- src/Storages/ObjectStorage/StorageObjectStorageSource.cpp | 3 ++- tests/integration/test_storage_iceberg/test.py | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 9744cdf2e302..2a06b91ad015 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -646,7 +646,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade } } } - +/* for (const auto & column : requested_columns_list) { const auto & column_name = column.first; @@ -671,6 +671,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade column.second.first, column.second.second.type); } +*/ } if (!constant_columns.empty()) diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index 3ed9c9ef2be5..2cb3d9c25db1 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -3689,11 +3689,11 @@ def get_events(query_id, event): events_all_data_optimized = get_events(all_data_query_id, event) # 1-2025, 6-2025 must not be read events_const_only_optimized = get_events(const_only_query_id, event) # All must not be read events_const_partial_optimized = get_events(const_partial_query_id, event) # 1-2025, 6-2025 and 2-2025 must not be read - events_const_partial2_optimized = get_events(const_partial2_query_id, event) # 1-2024, 1-2025, 2-2025 and 6-2025 must not be read + events_const_partial2_optimized = get_events(const_partial2_query_id, event) # 6-2025 must not be read, 1-2024, 1-2025, 2-2025 readed because of nulls events_count_optimized = get_events(count_query_id, event) # All must not be read - assert events_all_data_optimized == 5 + assert events_all_data_optimized == 6 # 5 assert events_const_only_optimized == 0 assert events_const_partial_optimized == 4 - assert events_const_partial2_optimized == 3 + assert events_const_partial2_optimized == 6 assert events_count_optimized == 0 From 825fe685aac11ac073d69a15e74cee6ff38b9a99 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Mon, 29 Sep 2025 12:12:28 +0200 Subject: [PATCH 7/9] Ignore negative nulls count --- src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp index 7cdcabbb2076..1e36e24c3a53 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.cpp @@ -148,7 +148,12 @@ DataFileMetaInfo::DataFileMetaInfo( { auto i_name = name_by_index.find(column.first); if (i_name != name_by_index.end()) - columns_info[i_name->second] = {column.second.rows_count, column.second.nulls_count, column.second.hyperrectangle}; + { + if (column.second.nulls_count.has_value() && column.second.nulls_count.value() >= 0) + columns_info[i_name->second] = {column.second.rows_count, column.second.nulls_count, column.second.hyperrectangle}; + else + columns_info[i_name->second] = {column.second.rows_count, std::nullopt, column.second.hyperrectangle}; + } } } From 3c77ee2ce8990cb92800e2e63e184872c38c192a Mon Sep 17 00:00:00 2001 From: Anton Ivashkin <56930273+ianton-ru@users.noreply.github.com> Date: Tue, 30 Sep 2025 14:50:41 +0200 Subject: [PATCH 8/9] Remove debug record on each chunk --- src/Storages/ObjectStorage/StorageObjectStorageSource.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 2a06b91ad015..3e124b3c6f8a 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -340,11 +340,6 @@ Chunk StorageObjectStorageSource::generate() { for (const auto & constant_column : reader.constant_columns_with_values) { - LOG_DEBUG(log, "Restore constant column '{}' index {} with value '{}'", - constant_column.second.name_and_type.name, - constant_column.first, - constant_column.second.value - ); chunk.addColumn(constant_column.first, constant_column.second.name_and_type.type->createColumnConst( chunk.getNumRows(), constant_column.second.value)->convertToFullColumnIfConst()); From fd23354bad3a6390d12705b070ea88b2908e3118 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin <56930273+ianton-ru@users.noreply.github.com> Date: Tue, 30 Sep 2025 15:46:54 +0200 Subject: [PATCH 9/9] Fix build --- src/Core/SettingsChangesHistory.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index c949c7b5389a..0d1dd9cf0835 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -79,7 +79,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() {"object_storage_cluster_join_mode", "allow", "allow", "New setting"}, {"object_storage_remote_initiator", false, false, "New setting."}, {"allow_experimental_export_merge_tree_part", false, false, "New setting."}, - {"allow_experimental_iceberg_read_optimization", true, true, "New setting."} + {"allow_experimental_iceberg_read_optimization", true, true, "New setting."}, {"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."}, }); addSettingsChanges(settings_changes_history, "25.6",