diff --git a/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.cpp b/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.cpp deleted file mode 100644 index 75eeb15a5190..000000000000 --- a/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.cpp +++ /dev/null @@ -1,525 +0,0 @@ -#include -#include - -#if USE_PARQUET - -#include -#include -#include -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - -namespace -{ - -bool isParquetStringTypeSupportedForBloomFilters( - const std::shared_ptr & logical_type, - parquet::ConvertedType::type converted_type) -{ - if (logical_type && - !logical_type->is_none() - && !(logical_type->is_string() || logical_type->is_BSON() || logical_type->is_JSON())) - { - return false; - } - - if (parquet::ConvertedType::type::NONE != converted_type && - !(converted_type == parquet::ConvertedType::JSON || converted_type == parquet::ConvertedType::UTF8 - || converted_type == parquet::ConvertedType::BSON)) - { - return false; - } - - return true; -} - -bool isParquetIntegerTypeSupportedForBloomFilters(const std::shared_ptr & logical_type, parquet::ConvertedType::type converted_type) -{ - if (logical_type && !logical_type->is_none() && !logical_type->is_int()) - { - return false; - } - - if (parquet::ConvertedType::type::NONE != converted_type && !(converted_type == parquet::ConvertedType::INT_8 || converted_type == parquet::ConvertedType::INT_16 - || converted_type == parquet::ConvertedType::INT_32 || converted_type == parquet::ConvertedType::INT_64 - || converted_type == parquet::ConvertedType::UINT_8 || converted_type == parquet::ConvertedType::UINT_16 - || converted_type == parquet::ConvertedType::UINT_32 || converted_type == parquet::ConvertedType::UINT_64)) - { - return false; - } - - return true; -} - -template -uint64_t hashSpecialFLBATypes(const Field & field) -{ - const T & value = field.safeGet(); - - parquet::FLBA flba(reinterpret_cast(&value)); - - parquet::XxHasher hasher; - - return hasher.Hash(&flba, sizeof(T)); -}; - -std::optional tryHashStringWithoutCompatibilityCheck(const Field & field) -{ - const auto field_type = field.getType(); - - if (field_type != Field::Types::Which::String) - { - return std::nullopt; - } - - parquet::XxHasher hasher; - parquet::ByteArray ba { field.safeGet() }; - - return hasher.Hash(&ba); -} - -std::optional tryHashString( - const Field & field, - const std::shared_ptr & logical_type, - parquet::ConvertedType::type converted_type) -{ - if (!isParquetStringTypeSupportedForBloomFilters(logical_type, converted_type)) - { - return std::nullopt; - } - - return tryHashStringWithoutCompatibilityCheck(field); -} - -std::optional tryHashFLBA( - const Field & field, - const std::shared_ptr & logical_type, - parquet::ConvertedType::type converted_type, - std::size_t parquet_column_length) -{ - if (!isParquetStringTypeSupportedForBloomFilters(logical_type, converted_type)) - { - return std::nullopt; - } - - const auto field_type = field.getType(); - - if (field_type == Field::Types::Which::IPv6 && parquet_column_length == sizeof(IPv6)) - { - return hashSpecialFLBATypes(field); - } - - return tryHashStringWithoutCompatibilityCheck(field); -} - -template -std::optional tryHashInt(const Field & field, const std::shared_ptr & logical_type, parquet::ConvertedType::type converted_type) -{ - if (!isParquetIntegerTypeSupportedForBloomFilters(logical_type, converted_type)) - { - return std::nullopt; - } - - parquet::XxHasher hasher; - - if (field.getType() == Field::Types::Which::Int64) - { - return hasher.Hash(static_cast(field.safeGet())); - } - else if (field.getType() == Field::Types::Which::UInt64) - { - return hasher.Hash(static_cast(field.safeGet())); - } - else if (field.getType() == Field::Types::IPv4) - { - /* - * In theory, we could accept IPv4 over 64 bits variables. It would only be a problem in case it was hashed using the byte array api - * with a zero-ed buffer that had a 32 bits variable copied into it. - * - * To be on the safe side, accept only in case physical type is 32 bits. - * */ - if constexpr (std::is_same_v) - { - return hasher.Hash(static_cast(field.safeGet())); - } - } - - return std::nullopt; -} - -std::optional tryHash(const Field & field, const parquet::ColumnDescriptor * parquet_column_descriptor) -{ - const auto physical_type = parquet_column_descriptor->physical_type(); - const auto & logical_type = parquet_column_descriptor->logical_type(); - const auto converted_type = parquet_column_descriptor->converted_type(); - - switch (physical_type) - { - case parquet::Type::type::INT32: - return tryHashInt(field, logical_type, converted_type); - case parquet::Type::type::INT64: - return tryHashInt(field, logical_type, converted_type); - case parquet::Type::type::BYTE_ARRAY: - return tryHashString(field, logical_type, converted_type); - case parquet::Type::type::FIXED_LEN_BYTE_ARRAY: - return tryHashFLBA(field, logical_type, converted_type, parquet_column_descriptor->type_length()); - default: - return std::nullopt; - } -} - -std::optional> hash(const IColumn * data_column, const parquet::ColumnDescriptor * parquet_column_descriptor) -{ - std::vector hashes; - - for (size_t i = 0u; i < data_column->size(); i++) - { - Field f; - data_column->get(i, f); - - auto hashed_value = tryHash(f, parquet_column_descriptor); - - if (!hashed_value) - { - return std::nullopt; - } - - hashes.emplace_back(*hashed_value); - } - - return hashes; -} - -bool maybeTrueOnBloomFilter(const std::vector & hashes, const std::unique_ptr & bloom_filter) -{ - for (const auto hash : hashes) - { - if (bloom_filter->FindHash(hash)) - { - return true; - } - } - - return false; -} - -const parquet::ColumnDescriptor * getColumnDescriptorIfBloomFilterIsPresent( - const std::unique_ptr & parquet_rg_metadata, - const std::vector & clickhouse_column_index_to_parquet_index, - std::size_t clickhouse_column_index) -{ - if (clickhouse_column_index_to_parquet_index.size() <= clickhouse_column_index) - { - return nullptr; - } - - const auto & parquet_indexes = clickhouse_column_index_to_parquet_index[clickhouse_column_index].parquet_indexes; - - // complex types like structs, tuples and maps will have more than one index. - // we don't support those for now - if (parquet_indexes.size() > 1) - { - return nullptr; - } - - if (parquet_indexes.empty()) - { - throw Exception(ErrorCodes::LOGICAL_ERROR, "Something bad happened, raise an issue and try the query with `input_format_parquet_bloom_filter_push_down=false`"); - } - - auto parquet_column_index = parquet_indexes[0]; - - const auto * parquet_column_descriptor = parquet_rg_metadata->schema()->Column(parquet_column_index); - - bool column_has_bloom_filter = parquet_rg_metadata->ColumnChunk(parquet_column_index)->bloom_filter_offset().has_value(); - if (!column_has_bloom_filter) - { - return nullptr; - } - - return parquet_column_descriptor; -} - -} - -ParquetBloomFilterCondition::ParquetBloomFilterCondition(const std::vector & condition_, const Block & header_) - : condition(condition_), header(header_) -{ -} - -bool ParquetBloomFilterCondition::mayBeTrueOnRowGroup(const ColumnIndexToBF & column_index_to_column_bf) const -{ - using Function = ConditionElement::Function; - std::vector rpn_stack; - - for (const auto & element : condition) - { - if (element.function == Function::FUNCTION_IN - || element.function == Function::FUNCTION_NOT_IN) - { - bool maybe_true = true; - for (auto column_index = 0u; column_index < element.hashes_per_column.size(); column_index++) - { - // in case bloom filter is not present for this row group - // https://github.com/ClickHouse/ClickHouse/pull/62966#discussion_r1722361237 - if (!column_index_to_column_bf.contains(element.key_columns[column_index])) - { - rpn_stack.emplace_back(true, true); - continue; - } - - bool column_maybe_contains = maybeTrueOnBloomFilter( - element.hashes_per_column[column_index], - column_index_to_column_bf.at(element.key_columns[column_index])); - - if (!column_maybe_contains) - { - maybe_true = false; - break; - } - } - - rpn_stack.emplace_back(maybe_true, true); - if (element.function == Function::FUNCTION_NOT_IN) - rpn_stack.back() = !rpn_stack.back(); - } - else if (element.function == Function::FUNCTION_NOT) - { - rpn_stack.back() = !rpn_stack.back(); - } - else if (element.function == Function::FUNCTION_OR) - { - auto arg1 = rpn_stack.back(); - rpn_stack.pop_back(); - auto arg2 = rpn_stack.back(); - rpn_stack.back() = arg1 | arg2; - } - else if (element.function == Function::FUNCTION_AND) - { - auto arg1 = rpn_stack.back(); - rpn_stack.pop_back(); - auto arg2 = rpn_stack.back(); - rpn_stack.back() = arg1 & arg2; - } - else if (element.function == Function::ALWAYS_TRUE) - { - rpn_stack.emplace_back(true, false); - } - else if (element.function == Function::ALWAYS_FALSE) - { - rpn_stack.emplace_back(false, true); - } - else - { - rpn_stack.emplace_back(true, true); - } - } - - if (rpn_stack.size() != 1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected stack size in KeyCondition::mayBeTrueOnRowGroup"); - - return rpn_stack[0].can_be_true; -} - -std::unordered_set ParquetBloomFilterCondition::getFilteringColumnKeys() const -{ - std::unordered_set column_keys; - - for (const auto & element : condition) - { - for (const auto index : element.key_columns) - { - column_keys.insert(index); - } - } - - return column_keys; -} - -/* - * `KeyCondition::rpn` is overly complex for bloom filters, some operations are not even supported. Not only that, but to avoid hashing each time - * we loop over a rpn element, we need to store hashes instead of where predicate values. To address this, we loop over `KeyCondition::rpn` - * and build a simplified RPN that holds hashes instead of values. - * - * `KeyCondition::RPNElement::FUNCTION_IN_RANGE` becomes: - * `FUNCTION_IN` - * `FUNCTION_UNKNOWN` when range limits are different - * `KeyCondition::RPNElement::FUNCTION_IN_SET` becomes - * `FUNCTION_IN` - * - * Complex types and structs are not supported. - * There are two sources of data types being analyzed, and they need to be compatible: DB::Field type and parquet type. - * This is determined by the `isColumnSupported` method. - * - * Some interesting examples: - * 1. file(..., 'str_column UInt64') where str_column = 50; Field.type == UInt64. Parquet type string. Not supported. - * 2. file(...) where str_column = 50; Field.type == String (conversion already taken care by `KeyCondition`). Parquet type string. - * 3. file(...) where uint32_column = toIPv4(5). Field.type == IPv4. Incompatible column types, resolved by `KeyCondition` itself. - * 4. file(...) where toIPv4(uint32_column) = toIPv4(5). Field.type == IPv4. We know it is safe to hash it using an int32 API. - * */ -std::vector keyConditionRPNToParquetBloomFilterCondition( - const std::vector & rpn, - const std::vector & clickhouse_column_index_to_parquet_index, - const std::unique_ptr & parquet_rg_metadata) -{ - std::vector condition_elements; - - using RPNElement = KeyCondition::RPNElement; - using Function = ParquetBloomFilterCondition::ConditionElement::Function; - - for (const auto & rpn_element : rpn) - { - // this would be a problem for `where negate(x) = -58`. - // It would perform a bf search on `-58`, and possibly miss row groups containing this data. - if (!rpn_element.monotonic_functions_chain.empty()) - { - condition_elements.emplace_back(Function::FUNCTION_UNKNOWN); - continue; - } - - ParquetBloomFilterCondition::ConditionElement::HashesForColumns hashes; - - if (rpn_element.function == RPNElement::FUNCTION_IN_RANGE - || rpn_element.function == RPNElement::FUNCTION_NOT_IN_RANGE) - { - // Only FUNCTION_EQUALS is supported and for that extremes need to be the same - if (rpn_element.range.left != rpn_element.range.right) - { - condition_elements.emplace_back(Function::FUNCTION_UNKNOWN); - continue; - } - - const auto * parquet_column_descriptor = - getColumnDescriptorIfBloomFilterIsPresent(parquet_rg_metadata, clickhouse_column_index_to_parquet_index, rpn_element.key_column); - - if (!parquet_column_descriptor) - { - condition_elements.emplace_back(Function::FUNCTION_UNKNOWN); - continue; - } - - auto hashed_value = tryHash(rpn_element.range.left, parquet_column_descriptor); - - if (!hashed_value) - { - condition_elements.emplace_back(Function::FUNCTION_UNKNOWN); - continue; - } - - std::vector hashes_for_column; - hashes_for_column.emplace_back(*hashed_value); - - hashes.emplace_back(std::move(hashes_for_column)); - - auto function = rpn_element.function == RPNElement::FUNCTION_IN_RANGE - ? ParquetBloomFilterCondition::ConditionElement::Function::FUNCTION_IN - : ParquetBloomFilterCondition::ConditionElement::Function::FUNCTION_NOT_IN; - - std::vector key_columns; - key_columns.emplace_back(rpn_element.key_column); - - condition_elements.emplace_back(function, std::move(hashes), std::move(key_columns)); - } - else if (rpn_element.function == RPNElement::FUNCTION_IN_SET - || rpn_element.function == RPNElement::FUNCTION_NOT_IN_SET) - { - const auto & set_index = rpn_element.set_index; - const auto & ordered_set = set_index->getOrderedSet(); - const auto & indexes_mapping = set_index->getIndexesMapping(); - bool found_empty_column = false; - - std::vector key_columns; - - for (auto i = 0u; i < ordered_set.size(); i++) - { - const auto & set_column = ordered_set[i]; - - const auto * parquet_column_descriptor = getColumnDescriptorIfBloomFilterIsPresent( - parquet_rg_metadata, - clickhouse_column_index_to_parquet_index, - indexes_mapping[i].key_index); - - if (!parquet_column_descriptor) - { - continue; - } - - auto column = set_column; - - if (column->empty()) - { - found_empty_column = true; - break; - } - - if (const auto & nullable_column = checkAndGetColumn(set_column.get())) - { - column = nullable_column->getNestedColumnPtr(); - } - - auto hashes_for_column_opt = hash(column.get(), parquet_column_descriptor); - - if (!hashes_for_column_opt) - { - continue; - } - - auto & hashes_for_column = *hashes_for_column_opt; - - if (hashes_for_column.empty()) - { - continue; - } - - hashes.emplace_back(hashes_for_column); - - key_columns.push_back(indexes_mapping[i].key_index); - } - - if (found_empty_column) - { - condition_elements.emplace_back(Function::ALWAYS_FALSE); - continue; - } - - if (hashes.empty()) - { - condition_elements.emplace_back(Function::FUNCTION_UNKNOWN); - continue; - } - - auto function = RPNElement::FUNCTION_IN_SET == rpn_element.function ? Function::FUNCTION_IN : Function::FUNCTION_NOT_IN; - - condition_elements.emplace_back(function, hashes, key_columns); - } - else if (rpn_element.function == RPNElement::FUNCTION_NOT) - { - condition_elements.emplace_back(Function::FUNCTION_NOT); - } - else if (rpn_element.function == RPNElement::FUNCTION_OR) - { - condition_elements.emplace_back(Function::FUNCTION_OR); - } - else if (rpn_element.function == RPNElement::FUNCTION_AND) - { - condition_elements.emplace_back(Function::FUNCTION_AND); - } - else - { - condition_elements.emplace_back(Function::ALWAYS_TRUE); - } - } - - return condition_elements; -} - -} - -#endif diff --git a/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.h b/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.h deleted file mode 100644 index 6de6030b23cb..000000000000 --- a/src/Processors/Formats/Impl/Parquet/ParquetBloomFilterCondition.h +++ /dev/null @@ -1,73 +0,0 @@ -#pragma once - -#include - -#if USE_PARQUET - -#include -#include -#include - -namespace parquet -{ -class BloomFilter; -} - -namespace DB -{ - -class ParquetBloomFilterCondition -{ -public: - - struct ConditionElement - { - enum Function - { - /// Atoms of a Boolean expression. - FUNCTION_IN, - FUNCTION_NOT_IN, - /// Can take any value. - FUNCTION_UNKNOWN, - /// Operators of the logical expression. - FUNCTION_NOT, - FUNCTION_AND, - FUNCTION_OR, - /// Constants - ALWAYS_FALSE, - ALWAYS_TRUE, - }; - - using ColumnPtr = IColumn::Ptr; - using HashesForColumns = std::vector>; - using KeyColumns = std::vector; - - Function function; - // each entry represents a list of hashes per column - // suppose there are three columns with 2 rows each - // hashes_per_column.size() == 3 and hashes_per_column[0].size() == 2 - HashesForColumns hashes_per_column; - KeyColumns key_columns; - }; - - using RPNElement = KeyCondition::RPNElement; - using ColumnIndexToBF = std::unordered_map>; - - explicit ParquetBloomFilterCondition(const std::vector & condition_, const Block & header_); - - bool mayBeTrueOnRowGroup(const ColumnIndexToBF & column_index_to_column_bf) const; - std::unordered_set getFilteringColumnKeys() const; - -private: - std::vector condition; - Block header; -}; - -std::vector keyConditionRPNToParquetBloomFilterCondition( - const std::vector & rpn, - const std::vector & clickhouse_column_index_to_parquet_index, - const std::unique_ptr & parquet_rg_metadata); - -} - -#endif diff --git a/src/Processors/Formats/Impl/Parquet/parquetBloomFilterHash.cpp b/src/Processors/Formats/Impl/Parquet/parquetBloomFilterHash.cpp new file mode 100644 index 000000000000..1686a2d8f5b3 --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/parquetBloomFilterHash.cpp @@ -0,0 +1,191 @@ +#include + +#if USE_PARQUET + +#include +#include + +namespace DB +{ + +bool isParquetStringTypeSupportedForBloomFilters( + const std::shared_ptr & logical_type, + parquet::ConvertedType::type converted_type) +{ + if (logical_type && + !logical_type->is_none() + && !(logical_type->is_string() || logical_type->is_BSON() || logical_type->is_JSON())) + { + return false; + } + + if (parquet::ConvertedType::type::NONE != converted_type && + !(converted_type == parquet::ConvertedType::JSON || converted_type == parquet::ConvertedType::UTF8 + || converted_type == parquet::ConvertedType::BSON)) + { + return false; + } + + return true; +} + +bool isParquetIntegerTypeSupportedForBloomFilters(const std::shared_ptr & logical_type, parquet::ConvertedType::type converted_type) +{ + if (logical_type && !logical_type->is_none() && !logical_type->is_int()) + { + return false; + } + + if (parquet::ConvertedType::type::NONE != converted_type && !(converted_type == parquet::ConvertedType::INT_8 || converted_type == parquet::ConvertedType::INT_16 + || converted_type == parquet::ConvertedType::INT_32 || converted_type == parquet::ConvertedType::INT_64 + || converted_type == parquet::ConvertedType::UINT_8 || converted_type == parquet::ConvertedType::UINT_16 + || converted_type == parquet::ConvertedType::UINT_32 || converted_type == parquet::ConvertedType::UINT_64)) + { + return false; + } + + return true; +} + +template +uint64_t hashSpecialFLBATypes(const Field & field) +{ + const T & value = field.safeGet(); + + parquet::FLBA flba(reinterpret_cast(&value)); + + parquet::XxHasher hasher; + + return hasher.Hash(&flba, sizeof(T)); +}; + +std::optional tryHashStringWithoutCompatibilityCheck(const Field & field) +{ + const auto field_type = field.getType(); + + if (field_type != Field::Types::Which::String) + { + return std::nullopt; + } + + parquet::XxHasher hasher; + parquet::ByteArray ba { field.safeGet() }; + + return hasher.Hash(&ba); +} + +std::optional tryHashString( + const Field & field, + const std::shared_ptr & logical_type, + parquet::ConvertedType::type converted_type) +{ + if (!isParquetStringTypeSupportedForBloomFilters(logical_type, converted_type)) + { + return std::nullopt; + } + + return tryHashStringWithoutCompatibilityCheck(field); +} + +std::optional tryHashFLBA( + const Field & field, + const std::shared_ptr & logical_type, + parquet::ConvertedType::type converted_type, + std::size_t parquet_column_length) +{ + if (!isParquetStringTypeSupportedForBloomFilters(logical_type, converted_type)) + { + return std::nullopt; + } + + const auto field_type = field.getType(); + + if (field_type == Field::Types::Which::IPv6 && parquet_column_length == sizeof(IPv6)) + { + return hashSpecialFLBATypes(field); + } + + return tryHashStringWithoutCompatibilityCheck(field); +} + +template +std::optional tryHashInt(const Field & field, const std::shared_ptr & logical_type, parquet::ConvertedType::type converted_type) +{ + if (!isParquetIntegerTypeSupportedForBloomFilters(logical_type, converted_type)) + { + return std::nullopt; + } + + parquet::XxHasher hasher; + + if (field.getType() == Field::Types::Which::Int64) + { + return hasher.Hash(static_cast(field.safeGet())); + } + else if (field.getType() == Field::Types::Which::UInt64) + { + return hasher.Hash(static_cast(field.safeGet())); + } + else if (field.getType() == Field::Types::IPv4) + { + /* + * In theory, we could accept IPv4 over 64 bits variables. It would only be a problem in case it was hashed using the byte array api + * with a zero-ed buffer that had a 32 bits variable copied into it. + * + * To be on the safe side, accept only in case physical type is 32 bits. + * */ + if constexpr (std::is_same_v) + { + return hasher.Hash(static_cast(field.safeGet())); + } + } + + return std::nullopt; +} + +std::optional parquetTryHashField(const Field & field, const parquet::ColumnDescriptor * parquet_column_descriptor) +{ + const auto physical_type = parquet_column_descriptor->physical_type(); + const auto & logical_type = parquet_column_descriptor->logical_type(); + const auto converted_type = parquet_column_descriptor->converted_type(); + + switch (physical_type) + { + case parquet::Type::type::INT32: + return tryHashInt(field, logical_type, converted_type); + case parquet::Type::type::INT64: + return tryHashInt(field, logical_type, converted_type); + case parquet::Type::type::BYTE_ARRAY: + return tryHashString(field, logical_type, converted_type); + case parquet::Type::type::FIXED_LEN_BYTE_ARRAY: + return tryHashFLBA(field, logical_type, converted_type, parquet_column_descriptor->type_length()); + default: + return std::nullopt; + } +} + +std::optional> parquetTryHashColumn(const IColumn * data_column, const parquet::ColumnDescriptor * parquet_column_descriptor) +{ + std::vector hashes; + + for (size_t i = 0u; i < data_column->size(); i++) + { + Field f; + data_column->get(i, f); + + auto hashed_value = parquetTryHashField(f, parquet_column_descriptor); + + if (!hashed_value) + { + return std::nullopt; + } + + hashes.emplace_back(*hashed_value); + } + + return hashes; +} + +} + +#endif diff --git a/src/Processors/Formats/Impl/Parquet/parquetBloomFilterHash.h b/src/Processors/Formats/Impl/Parquet/parquetBloomFilterHash.h new file mode 100644 index 000000000000..66ed05a23296 --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/parquetBloomFilterHash.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +#if USE_PARQUET + +#include + +namespace DB +{ + +/* + * Try to hash a ClickHouse field, nullopt in case it can't be done + * */ +std::optional parquetTryHashField(const Field & field, const parquet::ColumnDescriptor * parquet_column_descriptor); + + +/* + * Try to hash elements in a ClickHouse column; Will return std::nullopt in case one of them can't be hashed + * */ +std::optional> parquetTryHashColumn(const IColumn * data_column, const parquet::ColumnDescriptor * parquet_column_descriptor); + +} + +#endif diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index bfab82148d32..d6ce2d81b750 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -27,7 +27,7 @@ #include #include #include -#include +#include #include namespace CurrentMetrics @@ -46,6 +46,7 @@ namespace ErrorCodes extern const int INCORRECT_DATA; extern const int CANNOT_READ_ALL_DATA; extern const int CANNOT_PARSE_NUMBER; + extern const int LOGICAL_ERROR; } #define THROW_ARROW_NOT_OK(status) \ @@ -267,7 +268,29 @@ static Field decodePlainParquetValueSlow(const std::string & data, parquet::Type return field; } -static ParquetBloomFilterCondition::ColumnIndexToBF buildColumnIndexToBF( +struct ParquetBloomFilter final : public KeyCondition::BloomFilter +{ + explicit ParquetBloomFilter(std::unique_ptr && parquet_bf_) + : parquet_bf(std::move(parquet_bf_)) {} + + bool findAnyHash(const std::vector & hashes) override + { + for (const auto hash : hashes) + { + if (parquet_bf->FindHash(hash)) + { + return true; + } + } + + return false; + } + +private: + std::unique_ptr parquet_bf; +}; + +static KeyCondition::ColumnIndexToBloomFilter buildColumnIndexToBF( parquet::BloomFilterReader & bf_reader, int row_group, const std::vector & clickhouse_column_index_to_parquet_index, @@ -281,7 +304,7 @@ static ParquetBloomFilterCondition::ColumnIndexToBF buildColumnIndexToBF( return {}; } - ParquetBloomFilterCondition::ColumnIndexToBF index_to_column_bf; + KeyCondition::ColumnIndexToBloomFilter index_to_column_bf; for (const auto & [clickhouse_index, parquet_indexes] : clickhouse_column_index_to_parquet_index) { @@ -298,14 +321,14 @@ static ParquetBloomFilterCondition::ColumnIndexToBF buildColumnIndexToBF( auto parquet_index = parquet_indexes[0]; - auto bf = rg_bf->GetColumnBloomFilter(parquet_index); + auto parquet_bf = rg_bf->GetColumnBloomFilter(parquet_index); - if (!bf) + if (!parquet_bf) { continue; } - index_to_column_bf[clickhouse_index] = std::move(bf); + index_to_column_bf[clickhouse_index] = std::make_unique(std::move(parquet_bf)); } return index_to_column_bf; @@ -473,6 +496,61 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } +std::unordered_set getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn) +{ + std::unordered_set column_keys; + + for (const auto & element : rpn) + { + if (auto bf_data = element.bloom_filter_data) + { + for (const auto index : bf_data->key_columns) + { + column_keys.insert(index); + } + } + } + + return column_keys; +} + +const parquet::ColumnDescriptor * getColumnDescriptorIfBloomFilterIsPresent( + const std::unique_ptr & parquet_rg_metadata, + const std::vector & clickhouse_column_index_to_parquet_index, + std::size_t clickhouse_column_index) +{ + if (clickhouse_column_index_to_parquet_index.size() <= clickhouse_column_index) + { + return nullptr; + } + + const auto & parquet_indexes = clickhouse_column_index_to_parquet_index[clickhouse_column_index].parquet_indexes; + + // complex types like structs, tuples and maps will have more than one index. + // we don't support those for now + if (parquet_indexes.size() > 1) + { + return nullptr; + } + + if (parquet_indexes.empty()) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Column maps to 0 parquet leaf columns, raise an issue and try the query with `input_format_parquet_bloom_filter_push_down=false`"); + } + + auto parquet_column_index = parquet_indexes[0]; + + const auto * parquet_column_descriptor = parquet_rg_metadata->schema()->Column(parquet_column_index); + + bool column_has_bloom_filter = parquet_rg_metadata->ColumnChunk(parquet_column_index)->bloom_filter_offset().has_value(); + if (!column_has_bloom_filter) + { + return nullptr; + } + + return parquet_column_descriptor; +} + ParquetBlockInputFormat::ParquetBlockInputFormat( ReadBuffer & buf, const Block & header_, @@ -559,47 +637,97 @@ void ParquetBlockInputFormat::initializeIfNeeded() return std::min(std::max(preferred_num_rows, MIN_ROW_NUM), static_cast(format_settings.parquet.max_block_size)); }; - std::unique_ptr parquet_bloom_filter_condition; - std::unordered_set filtering_columns; - if (format_settings.parquet.bloom_filter_push_down && key_condition) + std::unique_ptr key_condition_with_bloom_filter_data; + + if (key_condition) { - bf_reader = parquet::BloomFilterReader::Make(arrow_file, metadata, bf_reader_properties, nullptr); + key_condition_with_bloom_filter_data = std::make_unique(*key_condition); + + if (format_settings.parquet.bloom_filter_push_down) + { + bf_reader = parquet::BloomFilterReader::Make(arrow_file, metadata, bf_reader_properties, nullptr); - const auto parquet_conditions = keyConditionRPNToParquetBloomFilterCondition( - key_condition->getRPN(), - index_mapping, - metadata->RowGroup(0)); - parquet_bloom_filter_condition = std::make_unique(parquet_conditions, getPort().getHeader()); + auto hash_one = [&](size_t column_idx, const Field & f) -> std::optional + { + const auto * parquet_column_descriptor + = getColumnDescriptorIfBloomFilterIsPresent(metadata->RowGroup(0), index_mapping, column_idx); - filtering_columns = parquet_bloom_filter_condition->getFilteringColumnKeys(); + if (!parquet_column_descriptor) + { + return std::nullopt; + } + + return parquetTryHashField(f, parquet_column_descriptor); + }; + + auto hash_many = [&](size_t column_idx, const ColumnPtr & column) -> std::optional> + { + const auto * parquet_column_descriptor + = getColumnDescriptorIfBloomFilterIsPresent(metadata->RowGroup(0), index_mapping, column_idx); + + if (!parquet_column_descriptor) + { + return std::nullopt; + } + + auto nested_column = column; + + if (const auto & nullable_column = checkAndGetColumn(column.get())) + { + nested_column = nullable_column->getNestedColumnPtr(); + } + + return parquetTryHashColumn(nested_column.get(), parquet_column_descriptor); + }; + + key_condition_with_bloom_filter_data->prepareBloomFilterData(hash_one, hash_many); + + filtering_columns = getBloomFilterFilteringColumnKeys(key_condition_with_bloom_filter_data->getRPN()); + } } - for (int row_group = 0; row_group < num_row_groups; ++row_group) + auto skip_row_group_based_on_filters = [&](int row_group) { - if (skip_row_groups.contains(row_group)) - continue; + if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down) + { + return false; + } + + KeyCondition::ColumnIndexToBloomFilter column_index_to_bloom_filter; + + const auto & header = getPort().getHeader(); - if (parquet_bloom_filter_condition) + std::vector hyperrectangle(header.columns(), Range::createWholeUniverse()); + + if (format_settings.parquet.filter_push_down) { - const auto column_index_to_bf = buildColumnIndexToBF(*bf_reader, row_group, index_mapping, filtering_columns); + hyperrectangle = getHyperrectangleForRowGroup(*metadata, row_group, header, format_settings); + } - if (!parquet_bloom_filter_condition->mayBeTrueOnRowGroup(column_index_to_bf)) - { - continue; - } + if (format_settings.parquet.bloom_filter_push_down) + { + column_index_to_bloom_filter = buildColumnIndexToBF(*bf_reader, row_group, index_mapping, filtering_columns); } - if (format_settings.parquet.filter_push_down && key_condition - && !key_condition - ->checkInHyperrectangle( - getHyperrectangleForRowGroup(*metadata, row_group, getPort().getHeader(), format_settings), - getPort().getHeader().getDataTypes()) - .can_be_true) + bool maybe_exists = key_condition_with_bloom_filter_data->checkInHyperrectangle(hyperrectangle, getPort().getHeader().getDataTypes(), column_index_to_bloom_filter).can_be_true; + + return !maybe_exists; + }; + + for (int row_group = 0; row_group < num_row_groups; ++row_group) + { + if (skip_row_groups.contains(row_group)) continue; - if (row_group_batches.empty() || row_group_batches.back().total_bytes_compressed >= min_bytes_for_seek) + if (key_condition_with_bloom_filter_data && skip_row_group_based_on_filters(row_group)) + { + continue; + } + + // When single-threaded parsing, can prefetch row groups, so need to put all row groups in the same row_group_batch + if (row_group_batches.empty() || row_group_batches.back().total_bytes_compressed >= min_bytes_for_seek) row_group_batches.emplace_back(); row_group_batches.back().row_groups_idxs.push_back(row_group); diff --git a/src/Storages/MergeTree/KeyCondition.cpp b/src/Storages/MergeTree/KeyCondition.cpp index e54d5237ae75..64d33da1d2c5 100644 --- a/src/Storages/MergeTree/KeyCondition.cpp +++ b/src/Storages/MergeTree/KeyCondition.cpp @@ -709,6 +709,32 @@ const std::unordered_map KeyConditi {"hilbertEncode", SpaceFillingCurveType::Hilbert} }; +static bool mayExistOnBloomFilter(const KeyCondition::BloomFilterData & condition_bloom_filter_data, + const KeyCondition::ColumnIndexToBloomFilter & column_index_to_column_bf) +{ + chassert(condition_bloom_filter_data.hashes_per_column.size() == condition_bloom_filter_data.key_columns.size()); + + for (auto column_index = 0u; column_index < condition_bloom_filter_data.hashes_per_column.size(); column_index++) + { + // In case bloom filter is missing for parts of the data + // (e.g. for some Parquet row groups: https://github.com/ClickHouse/ClickHouse/pull/62966#discussion_r1722361237). + if (!column_index_to_column_bf.contains(condition_bloom_filter_data.key_columns[column_index])) + { + continue; + } + + const auto & column_bf = column_index_to_column_bf.at(condition_bloom_filter_data.key_columns[column_index]); + const auto & hashes = condition_bloom_filter_data.hashes_per_column[column_index]; + + if (!column_bf->findAnyHash(hashes)) + { + return false; + } + } + + return true; +} + ActionsDAG KeyCondition::cloneASTWithInversionPushDown(ActionsDAG::NodeRawConstPtrs nodes, const ContextPtr & context) { ActionsDAG res; @@ -2838,7 +2864,8 @@ bool KeyCondition::extractPlainRanges(Ranges & ranges) const BoolMask KeyCondition::checkInHyperrectangle( const Hyperrectangle & hyperrectangle, - const DataTypes & data_types) const + const DataTypes & data_types, + const ColumnIndexToBloomFilter & column_index_to_column_bf) const { std::vector rpn_stack; @@ -2863,7 +2890,7 @@ BoolMask KeyCondition::checkInHyperrectangle( rpn_stack.emplace_back(true, true); } else if (element.function == RPNElement::FUNCTION_IN_RANGE - || element.function == RPNElement::FUNCTION_NOT_IN_RANGE) + || element.function == RPNElement::FUNCTION_NOT_IN_RANGE) { if (element.key_column >= hyperrectangle.size()) { @@ -2898,6 +2925,13 @@ BoolMask KeyCondition::checkInHyperrectangle( bool contains = element.range.containsRange(*key_range); rpn_stack.emplace_back(intersects, !contains); + + // we don't create bloom_filter_data if monotonic_functions_chain is present + if (rpn_stack.back().can_be_true && element.bloom_filter_data && element.monotonic_functions_chain.empty()) + { + rpn_stack.back().can_be_true = mayExistOnBloomFilter(*element.bloom_filter_data, column_index_to_column_bf); + } + if (element.function == RPNElement::FUNCTION_NOT_IN_RANGE) rpn_stack.back() = !rpn_stack.back(); } @@ -3031,6 +3065,12 @@ BoolMask KeyCondition::checkInHyperrectangle( throw Exception(ErrorCodes::LOGICAL_ERROR, "Set for IN is not created yet"); rpn_stack.emplace_back(element.set_index->checkInRange(hyperrectangle, data_types, single_point)); + + if (rpn_stack.back().can_be_true && element.bloom_filter_data) + { + rpn_stack.back().can_be_true = mayExistOnBloomFilter(*element.bloom_filter_data, column_index_to_column_bf); + } + if (element.function == RPNElement::FUNCTION_NOT_IN_SET) rpn_stack.back() = !rpn_stack.back(); } @@ -3076,6 +3116,85 @@ BoolMask KeyCondition::checkInHyperrectangle( return rpn_stack[0]; } +void KeyCondition::prepareBloomFilterData(std::function(size_t column_idx, const Field &)> hash_one, + std::function>(size_t column_idx, const ColumnPtr &)> hash_many) +{ + for (auto & rpn_element : rpn) + { + // this would be a problem for `where negate(x) = -58`. + // It would perform a bf search on `-58`, and possibly miss row groups containing this data. + if (!rpn_element.monotonic_functions_chain.empty()) + { + continue; + } + + KeyCondition::BloomFilterData::HashesForColumns hashes; + + if (rpn_element.function == RPNElement::FUNCTION_IN_RANGE + || rpn_element.function == RPNElement::FUNCTION_NOT_IN_RANGE) + { + // Only FUNCTION_EQUALS is supported and for that extremes need to be the same + if (rpn_element.range.left != rpn_element.range.right) + { + continue; + } + + auto hashed_value = hash_one(rpn_element.key_column, rpn_element.range.left); + + if (!hashed_value) + { + continue; + } + + hashes.emplace_back(std::vector{*hashed_value}); + + std::vector key_columns_for_element; + key_columns_for_element.emplace_back(rpn_element.key_column); + + rpn_element.bloom_filter_data = KeyCondition::BloomFilterData {std::move(hashes), std::move(key_columns_for_element)}; + } + else if (rpn_element.function == RPNElement::FUNCTION_IN_SET + || rpn_element.function == RPNElement::FUNCTION_NOT_IN_SET) + { + const auto & set_index = rpn_element.set_index; + const auto & ordered_set = set_index->getOrderedSet(); + const auto & indexes_mapping = set_index->getIndexesMapping(); + + std::vector key_columns_for_element; + + for (auto i = 0u; i < ordered_set.size(); i++) + { + const auto & set_column = ordered_set[i]; + + auto hashes_for_column_opt = hash_many(indexes_mapping[i].key_index, set_column); + + if (!hashes_for_column_opt) + { + continue; + } + + auto & hashes_for_column = *hashes_for_column_opt; + + if (hashes_for_column.empty()) + { + continue; + } + + hashes.emplace_back(hashes_for_column); + + key_columns_for_element.push_back(indexes_mapping[i].key_index); + } + + if (hashes.empty()) + { + continue; + } + + rpn_element.bloom_filter_data = {std::move(hashes), std::move(key_columns_for_element)}; + } + } +} + bool KeyCondition::mayBeTrueInRange( size_t used_key_size, const FieldRef * left_keys, diff --git a/src/Storages/MergeTree/KeyCondition.h b/src/Storages/MergeTree/KeyCondition.h index 800191c4fb0a..6a7725adebe0 100644 --- a/src/Storages/MergeTree/KeyCondition.h +++ b/src/Storages/MergeTree/KeyCondition.h @@ -49,10 +49,26 @@ class KeyCondition const ExpressionActionsPtr & key_expr, bool single_point_ = false); + struct BloomFilterData + { + using HashesForColumns = std::vector>; + HashesForColumns hashes_per_column; + std::vector key_columns; + }; + + struct BloomFilter + { + virtual ~BloomFilter() = default; + + virtual bool findAnyHash(const std::vector & hashes) = 0; + }; + + using ColumnIndexToBloomFilter = std::unordered_map>; /// Whether the condition and its negation are feasible in the direct product of single column ranges specified by `hyperrectangle`. BoolMask checkInHyperrectangle( const Hyperrectangle & hyperrectangle, - const DataTypes & data_types) const; + const DataTypes & data_types, + const ColumnIndexToBloomFilter & column_index_to_column_bf = {}) const; /// Whether the condition and its negation are (independently) feasible in the key range. /// left_key and right_key must contain all fields in the sort_descr in the appropriate order. @@ -207,6 +223,8 @@ class KeyCondition Hyperrectangle space_filling_curve_args_hyperrectangle; MonotonicFunctionsChain monotonic_functions_chain; + + std::optional bloom_filter_data; }; using RPN = std::vector; @@ -220,6 +238,11 @@ class KeyCondition bool isRelaxed() const { return relaxed; } + bool isSinglePoint() const { return single_point; } + + void prepareBloomFilterData(std::function(size_t column_idx, const Field &)> hash_one, + std::function>(size_t column_idx, const ColumnPtr &)> hash_many); + private: BoolMask checkInRange( size_t used_key_size, @@ -359,6 +382,7 @@ class KeyCondition }; using SpaceFillingCurveDescriptions = std::vector; SpaceFillingCurveDescriptions key_space_filling_curves; + void getAllSpaceFillingCurves(); /// Array joined column names diff --git a/tests/queries/0_stateless/03261_test_merge_parquet_bloom_filter_minmax_stats.reference b/tests/queries/0_stateless/03261_test_merge_parquet_bloom_filter_minmax_stats.reference new file mode 100644 index 000000000000..39ef9a1ef048 --- /dev/null +++ b/tests/queries/0_stateless/03261_test_merge_parquet_bloom_filter_minmax_stats.reference @@ -0,0 +1,16 @@ +{ + "data": [], + "rows": 0, + "statistics": { + "rows_read": 4, + "bytes_read": 56 + } +} +{ + "data": [], + "rows": 0, + "statistics": { + "rows_read": 0, + "bytes_read": 0 + } +} diff --git a/tests/queries/0_stateless/03261_test_merge_parquet_bloom_filter_minmax_stats.sh b/tests/queries/0_stateless/03261_test_merge_parquet_bloom_filter_minmax_stats.sh new file mode 100755 index 000000000000..3e93d4a1ed23 --- /dev/null +++ b/tests/queries/0_stateless/03261_test_merge_parquet_bloom_filter_minmax_stats.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash +# Tags: no-ubsan, no-fasttest + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + + +USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}') + +WORKING_DIR="${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME}" + +mkdir -p "${WORKING_DIR}" + +DATA_FILE="${CUR_DIR}/data_parquet/integers_1_5_no_3_bf_minmax.parquet" + +DATA_FILE_USER_PATH="${WORKING_DIR}/integers_1to5_no_3_bf_minmax.parquet" + +cp ${DATA_FILE} ${DATA_FILE_USER_PATH} + +# Prior to this PR, bloom filter and minmax were evaluated separately. +# This was sub-optimal for conditions like `x = 3 or x > 5` where data is [1, 2, 4, 5]. +# Bloom filter is not able to handle greater than operations. Therefore, it can't evaluate x > 5. Even though it can tell +# `3` is not in the set by evaluating `x = 3`, it can't discard the row group because of the `or` condition. +# On the other hand, min max can handle both. It'll evaluate x = 3 to true (because it is within the range) and the latter to false +# Therefore, bloom filter would determine `false or true` and minmax would determine `true or false`. Resulting in true. + +# Without bf to prove nothing is returned, but rows had to be read +${CLICKHOUSE_CLIENT} --query="select * from file('${DATA_FILE_USER_PATH}', Parquet) WHERE int8 = 3 or int8 > 5 FORMAT Json SETTINGS input_format_parquet_filter_push_down=true, input_format_parquet_bloom_filter_push_down=false;" | jq 'del(.meta,.statistics.elapsed)' + +# Since both structures are now evaluated together, the row group should be skipped +${CLICKHOUSE_CLIENT} --query="select * from file('${DATA_FILE_USER_PATH}', Parquet) WHERE int8 = 3 or int8 > 5 FORMAT Json SETTINGS input_format_parquet_filter_push_down=true, input_format_parquet_bloom_filter_push_down=true;" | jq 'del(.meta,.statistics.elapsed)' diff --git a/tests/queries/0_stateless/data_parquet/integers_1_5_no_3_bf_minmax.parquet b/tests/queries/0_stateless/data_parquet/integers_1_5_no_3_bf_minmax.parquet new file mode 100644 index 000000000000..1790322cdf69 Binary files /dev/null and b/tests/queries/0_stateless/data_parquet/integers_1_5_no_3_bf_minmax.parquet differ