Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions be/src/olap/rowset/segment_v2/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ class BloomFilter {
return hash_code;
}

static Result<uint64_t> hash(const char* buf, uint32_t size, HashStrategyPB strategy) {
if (strategy == HASH_MURMUR3_X64_64) {
uint64_t hash_code;
murmur_hash3_x64_64(buf, size, DEFAULT_SEED, &hash_code);
return hash_code;
} else {
return Status::InvalidArgument("invalid strategy:{}", strategy);
}
}

virtual void add_bytes(const char* buf, uint32_t size) {
if (buf == nullptr) {
*_has_null = true;
Expand Down
27 changes: 16 additions & 11 deletions be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ class BloomFilterIndexWriterImpl : public BloomFilterIndexWriter {
for (int i = 0; i < count; ++i) {
if (_values.find(*v) == _values.end()) {
if constexpr (_is_slice_type()) {
CppType new_value;
RETURN_IF_CATCH_EXCEPTION(_type_info->deep_copy(&new_value, v, &_arena));
_values.insert(new_value);
const auto* s = reinterpret_cast<const Slice*>(v);
auto hash =
DORIS_TRY(BloomFilter::hash(s->data, s->size, _bf_options.strategy));
_hash_values.insert(hash);
} else if constexpr (_is_int128()) {
int128_t new_value;
memcpy(&new_value, v, sizeof(PackedInt128));
Expand All @@ -105,25 +106,28 @@ class BloomFilterIndexWriterImpl : public BloomFilterIndexWriter {
Status flush() override {
std::unique_ptr<BloomFilter> bf;
RETURN_IF_ERROR(BloomFilter::create(BLOCK_BLOOM_FILTER, &bf));
RETURN_IF_ERROR(bf->init(_values.size(), _bf_options.fpp, _bf_options.strategy));
bf->set_has_null(_has_null);
for (auto& v : _values) {
if constexpr (_is_slice_type()) {
Slice* s = (Slice*)&v;
bf->add_bytes(s->data, s->size);
} else {
if constexpr (_is_slice_type()) {
RETURN_IF_ERROR(bf->init(_hash_values.size(), _bf_options.fpp, _bf_options.strategy));
for (const auto& h : _hash_values) {
bf->add_hash(h);
}
} else {
RETURN_IF_ERROR(bf->init(_values.size(), _bf_options.fpp, _bf_options.strategy));
for (auto& v : _values) {
bf->add_bytes((char*)&v, sizeof(CppType));
}
}
bf->set_has_null(_has_null);
_bf_buffer_size += bf->size();
_bfs.push_back(std::move(bf));
_values.clear();
_hash_values.clear();
_has_null = false;
return Status::OK();
}

Status finish(io::FileWriter* file_writer, ColumnIndexMetaPB* index_meta) override {
if (_values.size() > 0) {
if (_values.size() > 0 || !_hash_values.empty()) {
RETURN_IF_ERROR(flush());
}
index_meta->set_type(BLOOM_FILTER_INDEX);
Expand Down Expand Up @@ -172,6 +176,7 @@ class BloomFilterIndexWriterImpl : public BloomFilterIndexWriter {
// distinct values
ValueDict _values;
std::vector<std::unique_ptr<BloomFilter>> _bfs;
std::set<uint64_t> _hash_values;
};

} // namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@ void test_bloom_filter_index_reader_writer_template(
}
// test nullptr
EXPECT_TRUE(bf->test_bytes(nullptr, 1));

if (is_slice_type) {
Slice* value = (Slice*)(not_exist_value);
EXPECT_FALSE(bf->test_bytes(value->data, value->size));
} else {
EXPECT_FALSE(bf->test_bytes((char*)not_exist_value, sizeof(CppType)));
}
delete reader;
}
}
Expand Down
Loading