diff --git a/be/src/exec/hash_table.cpp b/be/src/exec/hash_table.cpp index d3f145a02a5327..0a577a122f1048 100644 --- a/be/src/exec/hash_table.cpp +++ b/be/src/exec/hash_table.cpp @@ -40,8 +40,11 @@ HashTable::HashTable(const std::vector& build_expr_ctxs, _initial_seed(initial_seed), _node_byte_size(sizeof(Node) + sizeof(Tuple*) * _num_build_tuples), _num_filled_buckets(0), - _nodes(NULL), + _current_nodes(nullptr), _num_nodes(0), + _current_capacity(num_buckets), + _current_used(0), + _total_capacity(num_buckets), _exceeded_limit(false), _mem_tracker(mem_tracker), _mem_limit_exceeded(false) { @@ -61,13 +64,15 @@ HashTable::HashTable(const std::vector& build_expr_ctxs, memset(_expr_values_buffer, 0, sizeof(uint8_t) * _results_buffer_size); _expr_value_null_bits = new uint8_t[_build_expr_ctxs.size()]; - _nodes_capacity = 1024; - _nodes = reinterpret_cast(malloc(_nodes_capacity * _node_byte_size)); - memset(_nodes, 0, _nodes_capacity * _node_byte_size); + _alloc_list.reserve(10); + _current_nodes = reinterpret_cast(malloc(_current_capacity * _node_byte_size)); + // TODO: remove memset later + memset(_current_nodes, 0, _current_capacity * _node_byte_size); + _alloc_list.push_back(_current_nodes); - _mem_tracker->Consume(_nodes_capacity * _node_byte_size); + _mem_tracker->Consume(_current_capacity * _node_byte_size); if (_mem_tracker->limit_exceeded()) { - mem_limit_exceeded(_nodes_capacity * _node_byte_size); + mem_limit_exceeded(_current_capacity * _node_byte_size); } } @@ -77,8 +82,10 @@ void HashTable::close() { // TODO: use tr1::array? delete[] _expr_values_buffer; delete[] _expr_value_null_bits; - free(_nodes); - _mem_tracker->Release(_nodes_capacity * _node_byte_size); + for (auto ptr : _alloc_list) { + free(ptr); + } + _mem_tracker->Release(_total_capacity * _node_byte_size); _mem_tracker->Release(_buckets.size() * sizeof(Bucket)); } @@ -199,11 +206,10 @@ void HashTable::resize_buckets(int64_t num_buckets) { Bucket* bucket = &_buckets[i]; Bucket* sister_bucket = &_buckets[i + old_num_buckets]; Node* last_node = NULL; - int node_idx = bucket->_node_idx; + Node* node = bucket->_node; - while (node_idx != -1) { - Node* node = get_node(node_idx); - int64_t next_idx = node->_next_idx; + while (node != nullptr) { + Node* next_node = node->_next; uint32_t hash = node->_hash; bool node_must_move = true; @@ -219,12 +225,12 @@ void HashTable::resize_buckets(int64_t num_buckets) { } if (node_must_move) { - move_node(bucket, move_to, node_idx, node, last_node); + move_node(bucket, move_to, node, last_node); } else { last_node = node; } - node_idx = next_idx; + node = next_node; } } @@ -233,19 +239,19 @@ void HashTable::resize_buckets(int64_t num_buckets) { } void HashTable::grow_node_array() { - int64_t old_size = _nodes_capacity * _node_byte_size; - _nodes_capacity = _nodes_capacity + _nodes_capacity / 2; - int64_t new_size = _nodes_capacity * _node_byte_size; - - uint8_t* new_nodes = reinterpret_cast(malloc(new_size)); - memset(new_nodes, 0, new_size); - memcpy(new_nodes, _nodes, old_size); - free(_nodes); - _nodes = new_nodes; - - _mem_tracker->Consume(new_size - old_size); + _current_capacity = _total_capacity / 2; + _total_capacity += _current_capacity; + int64_t alloc_size = _current_capacity * _node_byte_size; + _current_nodes = reinterpret_cast(malloc(alloc_size)); + _current_used = 0; + // TODO: remove memset later + memset(_current_nodes, 0, alloc_size); + // add _current_nodes to alloc pool + _alloc_list.push_back(_current_nodes); + + _mem_tracker->Consume(alloc_size); if (_mem_tracker->limit_exceeded()) { - mem_limit_exceeded(new_size - old_size); + mem_limit_exceeded(alloc_size); } } @@ -262,29 +268,27 @@ std::string HashTable::debug_string(bool skip_empty, const RowDescriptor* desc) ss << std::endl; for (int i = 0; i < _buckets.size(); ++i) { - int64_t node_idx = _buckets[i]._node_idx; + Node* node = _buckets[i]._node; bool first = true; - if (skip_empty && node_idx == -1) { + if (skip_empty && node == nullptr) { continue; } ss << i << ": "; - while (node_idx != -1) { - Node* node = get_node(node_idx); - + while (node != nullptr) { if (!first) { ss << ","; } if (desc == NULL) { - ss << node_idx << "(" << (void*)node->data() << ")"; + ss << node->_hash << "(" << (void*)node->data() << ")"; } else { ss << (void*)node->data() << " " << node->data()->to_string(*desc); } - node_idx = node->_next_idx; + node = node->_next; first = false; } diff --git a/be/src/exec/hash_table.h b/be/src/exec/hash_table.h index 511c61e817b281..caaf01bf561ad4 100644 --- a/be/src/exec/hash_table.h +++ b/be/src/exec/hash_table.h @@ -23,6 +23,7 @@ #include "codegen/doris_ir.h" #include "common/logging.h" +#include "common/object_pool.h" #include "util/hash_util.hpp" namespace doris { @@ -142,7 +143,7 @@ class HashTable { // Returns the number of bytes allocated to the hash table int64_t byte_size() const { - return _node_byte_size * _nodes_capacity + sizeof(Bucket) * _buckets.size(); + return _node_byte_size * _total_capacity + sizeof(Bucket) * _buckets.size(); } // Returns the results of the exprs at 'expr_idx' evaluated over the last row @@ -172,7 +173,7 @@ class HashTable { // stl-like iterator interface. class Iterator { public: - Iterator() : _table(NULL), _bucket_idx(-1), _node_idx(-1) {} + Iterator() : _table(NULL), _bucket_idx(-1), _node(nullptr) {} // Iterates to the next element. In the case where the iterator was // from a Find, this will lazily evaluate that bucket, only returning @@ -182,54 +183,52 @@ class HashTable { // Returns the current row or NULL if at end. TupleRow* get_row() { - if (_node_idx == -1) { + if (_node == nullptr) { return NULL; } - return _table->get_node(_node_idx)->data(); + return _node->data(); } // Returns Hash - uint32_t get_hash() { return _table->get_node(_node_idx)->_hash; } + uint32_t get_hash() { return _node->_hash; } // Returns if the iterator is at the end - bool has_next() { return _node_idx != -1; } + bool has_next() { return _node != nullptr; } // Returns true if this iterator is at the end, i.e. get_row() cannot be called. - bool at_end() { return _node_idx == -1; } + bool at_end() { return _node == nullptr; } // Sets as matched the node currently pointed by the iterator. The iterator // cannot be AtEnd(). void set_matched() { DCHECK(!at_end()); - Node* node = _table->get_node(_node_idx); - node->matched = true; + _node->matched = true; } bool matched() { DCHECK(!at_end()); - Node* node = _table->get_node(_node_idx); - return node->matched; + return _node->matched; } bool operator==(const Iterator& rhs) { - return _bucket_idx == rhs._bucket_idx && _node_idx == rhs._node_idx; + return _bucket_idx == rhs._bucket_idx && _node == rhs._node; } bool operator!=(const Iterator& rhs) { - return _bucket_idx != rhs._bucket_idx || _node_idx != rhs._node_idx; + return _bucket_idx != rhs._bucket_idx || _node != rhs._node; } private: friend class HashTable; - Iterator(HashTable* table, int bucket_idx, int64_t node, uint32_t hash) - : _table(table), _bucket_idx(bucket_idx), _node_idx(node), _scan_hash(hash) {} + Iterator(HashTable* table, int bucket_idx, Node* node, uint32_t hash) + : _table(table), _bucket_idx(bucket_idx), _node(node), _scan_hash(hash) {} HashTable* _table; // Current bucket idx int64_t _bucket_idx; - // Current node idx (within current bucket) - int64_t _node_idx; + // Current node (within current bucket) + Node* _node; // cached hash value for the row passed to find()() uint32_t _scan_hash; }; @@ -241,11 +240,11 @@ class HashTable { // Header portion of a Node. The node data (TupleRow) is right after the // node memory to maximize cache hits. struct Node { - int64_t _next_idx; // chain to next node for collisions - uint32_t _hash; // Cache of the hash for _data + Node* _next; // chain to next node for collisions + uint32_t _hash; // Cache of the hash for _data bool matched; - Node() : _next_idx(-1), _hash(-1), matched(false) {} + Node() : _next(nullptr), _hash(-1), matched(false) {} TupleRow* data() { uint8_t* mem = reinterpret_cast(this); @@ -255,22 +254,14 @@ class HashTable { }; struct Bucket { - int64_t _node_idx; - - Bucket() { _node_idx = -1; } + Bucket() { _node = nullptr; } + Node* _node; }; // Returns the next non-empty bucket and updates idx to be the index of that bucket. // If there are no more buckets, returns NULL and sets idx to -1 Bucket* next_bucket(int64_t* bucket_idx); - // Returns node at idx. Tracking structures do not use pointers since they will - // change as the HashTable grows. - Node* get_node(int64_t idx) { - DCHECK_NE(idx, -1); - return reinterpret_cast(_nodes + _node_byte_size * idx); - } - // Resize the hash table to 'num_buckets' void resize_buckets(int64_t num_buckets); @@ -279,12 +270,11 @@ class HashTable { // Chains the node at 'node_idx' to 'bucket'. Nodes in a bucket are chained // as a linked list; this places the new node at the beginning of the list. - void add_to_bucket(Bucket* bucket, int64_t node_idx, Node* node); + void add_to_bucket(Bucket* bucket, Node* node); // Moves a node from one bucket to another. 'previous_node' refers to the // node (if any) that's chained before this node in from_bucket's linked list. - void move_node(Bucket* from_bucket, Bucket* to_bucket, int64_t node_idx, Node* node, - Node* previous_node); + void move_node(Bucket* from_bucket, Bucket* to_bucket, Node* node, Node* previous_node); // Evaluate the exprs over row and cache the results in '_expr_values_buffer'. // Returns whether any expr evaluated to NULL @@ -354,14 +344,16 @@ class HashTable { const int _node_byte_size; // Number of non-empty buckets. Used to determine when to grow and rehash int64_t _num_filled_buckets; - // Memory to store node data. This is not allocated from a pool to take advantage - // of realloc. - // TODO: integrate with mem pools - uint8_t* _nodes; + // Buffer to store node data. + uint8_t* _current_nodes; // number of nodes stored (i.e. size of hash table) int64_t _num_nodes; - // max number of nodes that can be stored in '_nodes' before realloc - int64_t _nodes_capacity; + // current nodes buffer capacity + int64_t _current_capacity; + // current used + int64_t _current_used; + // total capacity + int64_t _total_capacity; bool _exceeded_limit; // true if any of _mem_trackers[].limit_exceeded() @@ -395,6 +387,8 @@ class HashTable { // Use bytes instead of bools to be compatible with llvm. This address must // not change once allocated. uint8_t* _expr_value_null_bits; + // node buffer list + std::vector _alloc_list; }; } // namespace doris diff --git a/be/src/exec/hash_table.hpp b/be/src/exec/hash_table.hpp index 70f0e38f7f45e6..ae2710d64500db 100644 --- a/be/src/exec/hash_table.hpp +++ b/be/src/exec/hash_table.hpp @@ -33,16 +33,14 @@ inline HashTable::Iterator HashTable::find(TupleRow* probe_row, bool probe) { int64_t bucket_idx = hash & (_num_buckets - 1); Bucket* bucket = &_buckets[bucket_idx]; - int64_t node_idx = bucket->_node_idx; - - while (node_idx != -1) { - Node* node = get_node(node_idx); + Node* node = bucket->_node; + while (node != nullptr) { if (node->_hash == hash && equals(node->data())) { - return Iterator(this, bucket_idx, node_idx, hash); + return Iterator(this, bucket_idx, node, hash); } - node_idx = node->_next_idx; + node = node->_next; } return end(); @@ -53,7 +51,7 @@ inline HashTable::Iterator HashTable::begin() { Bucket* bucket = next_bucket(&bucket_idx); if (bucket != NULL) { - return Iterator(this, bucket_idx, bucket->_node_idx, 0); + return Iterator(this, bucket_idx, bucket->_node, 0); } return end(); @@ -63,7 +61,7 @@ inline HashTable::Bucket* HashTable::next_bucket(int64_t* bucket_idx) { ++*bucket_idx; for (; *bucket_idx < _num_buckets; ++*bucket_idx) { - if (_buckets[*bucket_idx]._node_idx != -1) { + if (_buckets[*bucket_idx]._node != nullptr) { return &_buckets[*bucket_idx]; } } @@ -82,77 +80,78 @@ inline void HashTable::insert_impl(TupleRow* row) { uint32_t hash = hash_current_row(); int64_t bucket_idx = hash & (_num_buckets - 1); - if (_num_nodes == _nodes_capacity) { + if (_current_used == _current_capacity) { grow_node_array(); } + // get a node from memory pool + Node* node = reinterpret_cast(_current_nodes + _node_byte_size * _current_used++); - Node* node = get_node(_num_nodes); TupleRow* data = node->data(); node->_hash = hash; memcpy(data, row, sizeof(Tuple*) * _num_build_tuples); - add_to_bucket(&_buckets[bucket_idx], _num_nodes, node); + add_to_bucket(&_buckets[bucket_idx], node); ++_num_nodes; } -inline void HashTable::add_to_bucket(Bucket* bucket, int64_t node_idx, Node* node) { - if (bucket->_node_idx == -1) { +inline void HashTable::add_to_bucket(Bucket* bucket, Node* node) { + if (bucket->_node == nullptr) { ++_num_filled_buckets; } - node->_next_idx = bucket->_node_idx; - bucket->_node_idx = node_idx; + node->_next = bucket->_node; + bucket->_node = node; } -inline void HashTable::move_node(Bucket* from_bucket, Bucket* to_bucket, - int64_t node_idx, Node* node, Node* previous_node) { - int64_t next_idx = node->_next_idx; +inline void HashTable::move_node(Bucket* from_bucket, Bucket* to_bucket, Node* node, + Node* previous_node) { + Node* next_node = node->_next; if (previous_node != NULL) { - previous_node->_next_idx = next_idx; + previous_node->_next = next_node; } else { // Update bucket directly - from_bucket->_node_idx = next_idx; + from_bucket->_node = next_node; - if (next_idx == -1) { + if (next_node == nullptr) { --_num_filled_buckets; } } - add_to_bucket(to_bucket, node_idx, node); + add_to_bucket(to_bucket, node); } -template +template inline void HashTable::Iterator::next() { if (_bucket_idx == -1) { return; } // TODO: this should prefetch the next tuplerow - Node* node = _table->get_node(_node_idx); + Node* node = _node; // Iterator is not from a full table scan, evaluate equality now. Only the current // bucket needs to be scanned. '_expr_values_buffer' contains the results // for the current probe row. if (check_match) { // TODO: this should prefetch the next node - int64_t next_idx = node->_next_idx; + Node* next_node = node->_next; - while (next_idx != -1) { - node = _table->get_node(next_idx); + while (next_node != nullptr) { + node = next_node; if (node->_hash == _scan_hash && _table->equals(node->data())) { - _node_idx = next_idx; + _node = next_node; return; } - next_idx = node->_next_idx; + next_node = node->_next; } *this = _table->end(); } else { // Move onto the next chained node - if (node->_next_idx != -1) { - _node_idx = node->_next_idx; + if (node->_next != nullptr) { + _node = node->_next; return; } @@ -161,13 +160,13 @@ inline void HashTable::Iterator::next() { if (bucket == NULL) { _bucket_idx = -1; - _node_idx = -1; + _node = nullptr; } else { - _node_idx = bucket->_node_idx; + _node = bucket->_node; } } } -} +} // namespace doris #endif diff --git a/be/src/runtime/data_stream_recvr.hpp b/be/src/runtime/data_stream_recvr.hpp deleted file mode 100644 index 0348134f3f8fe6..00000000000000 --- a/be/src/runtime/data_stream_recvr.hpp +++ /dev/null @@ -1,64 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_QUERY_BE_RUNTIME_DATA_STREAM_RECVR_H -#define DORIS_BE_SRC_QUERY_BE_RUNTIME_DATA_STREAM_RECVR_H - -#include "runtime/data_stream_mgr.h" - -namespace doris { - -class DataStreamMgr; - -// Single receiver of an m:n data stream. -// Incoming row batches are routed to destinations based on the provided -// partitioning specification. -// Receivers are created via DataStreamMgr::CreateRecvr(). -class DataStreamRecvr { -public: - // deregister from _mgr - ~DataStreamRecvr() { - // TODO: log error msg - _mgr->deregister_recvr(_cb->fragment_instance_id(), _cb->dest_node_id()); - } - - // Returns next row batch in data stream; blocks if there aren't any. - // Returns NULL if eos (subsequent calls will not return any more batches). - // Sets 'is_cancelled' to true if receiver fragment got cancelled, otherwise false. - // The caller owns the batch. - // TODO: error handling - RowBatch* get_batch(bool* is_cancelled) { - return _cb->get_batch(is_cancelled); - } - - RuntimeProfile* profile() { - return _cb->profile(); - } - -private: - friend class DataStreamMgr; - DataStreamMgr* _mgr; - boost::shared_ptr _cb; - - DataStreamRecvr(DataStreamMgr* mgr, - boost::shared_ptr cb) - : _mgr(mgr), _cb(cb) {} -}; - -} - -#endif diff --git a/be/test/exec/CMakeLists.txt b/be/test/exec/CMakeLists.txt index a01330e8561bcc..8d2bb42cac7c10 100644 --- a/be/test/exec/CMakeLists.txt +++ b/be/test/exec/CMakeLists.txt @@ -24,7 +24,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/test/exec") # TODO: why is this test disabled? #ADD_BE_TEST(new_olap_scan_node_test) #ADD_BE_TEST(pre_aggregation_node_test) -#ADD_BE_TEST(hash_table_test) +ADD_BE_TEST(hash_table_test) # ADD_BE_TEST(partitioned_hash_table_test) #ADD_BE_TEST(olap_scanner_test) #ADD_BE_TEST(olap_meta_reader_test) diff --git a/be/test/exec/hash_table_test.cpp b/be/test/exec/hash_table_test.cpp index f842239e1abddf..03f0c562317f84 100644 --- a/be/test/exec/hash_table_test.cpp +++ b/be/test/exec/hash_table_test.cpp @@ -23,46 +23,66 @@ #include #include +#include +#include #include #include "common/compiler_util.h" #include "exprs/expr.h" +#include "exprs/slot_ref.h" +#include "runtime/exec_env.h" #include "runtime/mem_pool.h" +#include "runtime/mem_tracker.h" +#include "runtime/runtime_state.h" #include "runtime/string_value.h" +#include "runtime/test_env.h" #include "util/cpu_info.h" #include "util/runtime_profile.h" +#include "util/time.h" namespace doris { -using std::vector; -using std::map; - class HashTableTest : public testing::Test { public: - HashTableTest() : _mem_pool() {} + HashTableTest() { + _tracker = MemTracker::CreateTracker(-1, "root"); + _pool_tracker = MemTracker::CreateTracker(-1, "mem-pool", _tracker); + _mem_pool.reset(new MemPool(_pool_tracker.get())); + _state = _pool.add(new RuntimeState(TQueryGlobals())); + _state->init_instance_mem_tracker(); + _state->_exec_env = ExecEnv::GetInstance(); + } protected: + RuntimeState* _state; + std::shared_ptr _tracker; + std::shared_ptr _pool_tracker; ObjectPool _pool; - MemPool _mem_pool; - std::vector _build_expr; - std::vector _probe_expr; + std::shared_ptr _mem_pool; + std::vector _build_expr; + std::vector _probe_expr; virtual void SetUp() { RowDescriptor desc; Status status; + TypeDescriptor int_desc(TYPE_INT); - // Not very easy to test complex tuple layouts so this test will use the - // simplest. The purpose of these tests is to exercise the hash map - // internals so a simple build/probe expr is fine. - _build_expr.push_back(_pool.add(new SlotRef(TYPE_INT, 0))); - status = Expr::prepare(_build_expr, NULL, desc); + auto build_slot_ref = _pool.add(new SlotRef(int_desc, 0)); + _build_expr.push_back(_pool.add(new ExprContext(build_slot_ref))); + status = Expr::prepare(_build_expr, _state, desc, _tracker); EXPECT_TRUE(status.ok()); - _probe_expr.push_back(_pool.add(new SlotRef(TYPE_INT, 0))); - status = Expr::prepare(_probe_expr, NULL, desc); + auto probe_slot_ref = _pool.add(new SlotRef(int_desc, 0)); + _probe_expr.push_back(_pool.add(new ExprContext(probe_slot_ref))); + status = Expr::prepare(_probe_expr, _state, desc, _tracker); EXPECT_TRUE(status.ok()); } + void TearDown() { + Expr::close(_build_expr, _state); + Expr::close(_probe_expr, _state); + } + TupleRow* create_tuple_row(int32_t val); // Wrapper to call private methods on HashTable @@ -118,7 +138,7 @@ class HashTableTest : public testing::Test { EXPECT_TRUE(iter == table->end()); } else { if (scan) { - map matched; + std::map matched; while (iter != table->end()) { EXPECT_TRUE(matched.find(iter.get_row()) == matched.end()); @@ -143,8 +163,8 @@ class HashTableTest : public testing::Test { }; TupleRow* HashTableTest::create_tuple_row(int32_t val) { - uint8_t* tuple_row_mem = _mem_pool.allocate(sizeof(int32_t*)); - uint8_t* tuple_mem = _mem_pool.allocate(sizeof(int32_t)); + uint8_t* tuple_row_mem = _mem_pool->allocate(sizeof(int32_t*)); + uint8_t* tuple_mem = _mem_pool->allocate(sizeof(int32_t)); *reinterpret_cast(tuple_mem) = val; TupleRow* row = reinterpret_cast(tuple_row_mem); row->set_tuple(0, reinterpret_cast(tuple_mem)); @@ -173,6 +193,9 @@ TEST_F(HashTableTest, SetupTest) { // testing for probe rows that are both there and not. // The hash table is rehashed a few times and the scans/finds are tested again. TEST_F(HashTableTest, BasicTest) { + std::shared_ptr hash_table_tracker = + MemTracker::CreateTracker(-1, "hash-table-basic-tracker", _tracker); + TupleRow* build_rows[5]; TupleRow* scan_rows[5] = {0}; @@ -190,8 +213,11 @@ TEST_F(HashTableTest, BasicTest) { } } - // Create the hash table and insert the build rows - HashTable hash_table(_build_expr, _probe_expr, 1, false, 0); + std::vector is_null_safe = {false}; + int initial_seed = 1; + int64_t num_buckets = 4; + HashTable hash_table(_build_expr, _probe_expr, 1, false, is_null_safe, initial_seed, + hash_table_tracker, num_buckets); for (int i = 0; i < 5; ++i) { hash_table.insert(build_rows[i]); @@ -226,11 +252,19 @@ TEST_F(HashTableTest, BasicTest) { memset(scan_rows, 0, sizeof(scan_rows)); full_scan(&hash_table, 0, 5, true, scan_rows, build_rows); probe_test(&hash_table, probe_rows, 10, false); + hash_table.close(); } // This tests makes sure we can scan ranges of buckets TEST_F(HashTableTest, ScanTest) { - HashTable hash_table(_build_expr, _probe_expr, 1, false, 0); + std::shared_ptr hash_table_tracker = + MemTracker::CreateTracker(-1, "hash-table-scan-tracker", _tracker); + + std::vector is_null_safe = {false}; + int initial_seed = 1; + int64_t num_buckets = 4; + HashTable hash_table(_build_expr, _probe_expr, 1, false, is_null_safe, initial_seed, + hash_table_tracker, num_buckets); // Add 1 row with val 1, 2 with val 2, etc std::vector build_rows; ProbeTestData probe_rows[15]; @@ -267,6 +301,8 @@ TEST_F(HashTableTest, ScanTest) { resize_table(&hash_table, 2); EXPECT_EQ(hash_table.num_buckets(), 2); probe_test(&hash_table, probe_rows, 15, true); + + hash_table.close(); } // This test continues adding to the hash table to trigger the resize code paths @@ -275,8 +311,13 @@ TEST_F(HashTableTest, GrowTableTest) { int num_to_add = 4; int expected_size = 0; - auto mem_tracker = std::make_shared(1024 * 1024); - HashTable hash_table(_build_expr, _probe_expr, 1, false, 0, mem_tracker, num_to_add); + std::shared_ptr mem_tracker = + MemTracker::CreateTracker(1024 * 1024, "hash-table-grow-tracker", _tracker); + std::vector is_null_safe = {false}; + int initial_seed = 1; + int64_t num_buckets = 4; + HashTable hash_table(_build_expr, _probe_expr, 1, false, is_null_safe, initial_seed, + mem_tracker, num_buckets); EXPECT_FALSE(mem_tracker->limit_exceeded()); // This inserts about 5M entries @@ -289,6 +330,7 @@ TEST_F(HashTableTest, GrowTableTest) { num_to_add *= 2; EXPECT_EQ(hash_table.size(), expected_size); } + LOG(INFO) << "consume:" << mem_tracker->consumption() << ",expected_size:" << expected_size; EXPECT_TRUE(mem_tracker->limit_exceeded()); @@ -304,6 +346,7 @@ TEST_F(HashTableTest, GrowTableTest) { EXPECT_TRUE(iter == hash_table.end()); } } + hash_table.close(); } // This test continues adding to the hash table to trigger the resize code paths @@ -312,37 +355,39 @@ TEST_F(HashTableTest, GrowTableTest2) { int num_to_add = 1024; int expected_size = 0; - auto mem_tracker = std::make_shared(1024 * 1024); - HashTable hash_table(_build_expr, _probe_expr, 1, false, 0, mem_tracker, num_to_add); + std::shared_ptr mem_tracker = + MemTracker::CreateTracker(1024 * 1024, "hash-table-grow2-tracker", _tracker); + std::vector is_null_safe = {false}; + int initial_seed = 1; + int64_t num_buckets = 4; + HashTable hash_table(_build_expr, _probe_expr, 1, false, is_null_safe, initial_seed, + mem_tracker, num_buckets); LOG(INFO) << time(NULL); - // This inserts about 5M entries - for (int i = 0; i < 5 * 1024 * 1024; ++i) { - hash_table.insert(create_tuple_row(build_row_val)); + // constexpr const int test_size = 5 * 1024 * 1024; + constexpr const int test_size = 5 * 1024 * 100; + + for (int i = 0; i < test_size; ++i) { + hash_table.insert(create_tuple_row(build_row_val++)); expected_size += num_to_add; } LOG(INFO) << time(NULL); // Validate that we can find the entries - for (int i = 0; i < 5 * 1024 * 1024; ++i) { - TupleRow* probe_row = create_tuple_row(i); + for (int i = 0; i < test_size; ++i) { + TupleRow* probe_row = create_tuple_row(i++); hash_table.find(probe_row); } LOG(INFO) << time(NULL); + hash_table.close(); } } // namespace doris int main(int argc, char** argv) { - std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf"; - if (!doris::config::init(conffile.c_str(), false)) { - fprintf(stderr, "error read config file. \n"); - return -1; - } - init_glog("be-test"); ::testing::InitGoogleTest(&argc, argv); doris::CpuInfo::init(); return RUN_ALL_TESTS();