Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 37 additions & 33 deletions be/src/exec/hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ HashTable::HashTable(const std::vector<ExprContext*>& build_expr_ctxs,
_initial_seed(initial_seed),
_node_byte_size(sizeof(Node) + sizeof(Tuple*) * _num_build_tuples),
_num_filled_buckets(0),
_nodes(NULL),
_current_nodes(nullptr),
_num_nodes(0),
_current_capacity(num_buckets),
_current_used(0),
_total_capacity(num_buckets),
_exceeded_limit(false),
_mem_tracker(mem_tracker),
_mem_limit_exceeded(false) {
Expand All @@ -61,13 +64,15 @@ HashTable::HashTable(const std::vector<ExprContext*>& build_expr_ctxs,
memset(_expr_values_buffer, 0, sizeof(uint8_t) * _results_buffer_size);
_expr_value_null_bits = new uint8_t[_build_expr_ctxs.size()];

_nodes_capacity = 1024;
_nodes = reinterpret_cast<uint8_t*>(malloc(_nodes_capacity * _node_byte_size));
memset(_nodes, 0, _nodes_capacity * _node_byte_size);
_alloc_list.reserve(10);
_current_nodes = reinterpret_cast<uint8_t*>(malloc(_current_capacity * _node_byte_size));
// TODO: remove memset later
memset(_current_nodes, 0, _current_capacity * _node_byte_size);
_alloc_list.push_back(_current_nodes);

_mem_tracker->Consume(_nodes_capacity * _node_byte_size);
_mem_tracker->Consume(_current_capacity * _node_byte_size);
if (_mem_tracker->limit_exceeded()) {
mem_limit_exceeded(_nodes_capacity * _node_byte_size);
mem_limit_exceeded(_current_capacity * _node_byte_size);
}
}

Expand All @@ -77,8 +82,10 @@ void HashTable::close() {
// TODO: use tr1::array?
delete[] _expr_values_buffer;
delete[] _expr_value_null_bits;
free(_nodes);
_mem_tracker->Release(_nodes_capacity * _node_byte_size);
for (auto ptr : _alloc_list) {
free(ptr);
}
_mem_tracker->Release(_total_capacity * _node_byte_size);
_mem_tracker->Release(_buckets.size() * sizeof(Bucket));
}

Expand Down Expand Up @@ -199,11 +206,10 @@ void HashTable::resize_buckets(int64_t num_buckets) {
Bucket* bucket = &_buckets[i];
Bucket* sister_bucket = &_buckets[i + old_num_buckets];
Node* last_node = NULL;
int node_idx = bucket->_node_idx;
Node* node = bucket->_node;

while (node_idx != -1) {
Node* node = get_node(node_idx);
int64_t next_idx = node->_next_idx;
while (node != nullptr) {
Node* next_node = node->_next;
uint32_t hash = node->_hash;

bool node_must_move = true;
Expand All @@ -219,12 +225,12 @@ void HashTable::resize_buckets(int64_t num_buckets) {
}

if (node_must_move) {
move_node(bucket, move_to, node_idx, node, last_node);
move_node(bucket, move_to, node, last_node);
} else {
last_node = node;
}

node_idx = next_idx;
node = next_node;
}
}

Expand All @@ -233,19 +239,19 @@ void HashTable::resize_buckets(int64_t num_buckets) {
}

void HashTable::grow_node_array() {
int64_t old_size = _nodes_capacity * _node_byte_size;
_nodes_capacity = _nodes_capacity + _nodes_capacity / 2;
int64_t new_size = _nodes_capacity * _node_byte_size;

uint8_t* new_nodes = reinterpret_cast<uint8_t*>(malloc(new_size));
memset(new_nodes, 0, new_size);
memcpy(new_nodes, _nodes, old_size);
free(_nodes);
_nodes = new_nodes;

_mem_tracker->Consume(new_size - old_size);
_current_capacity = _total_capacity / 2;
_total_capacity += _current_capacity;
int64_t alloc_size = _current_capacity * _node_byte_size;
_current_nodes = reinterpret_cast<uint8_t*>(malloc(alloc_size));
_current_used = 0;
// TODO: remove memset later
memset(_current_nodes, 0, alloc_size);
// add _current_nodes to alloc pool
_alloc_list.push_back(_current_nodes);

_mem_tracker->Consume(alloc_size);
if (_mem_tracker->limit_exceeded()) {
mem_limit_exceeded(new_size - old_size);
mem_limit_exceeded(alloc_size);
}
}

Expand All @@ -262,29 +268,27 @@ std::string HashTable::debug_string(bool skip_empty, const RowDescriptor* desc)
ss << std::endl;

for (int i = 0; i < _buckets.size(); ++i) {
int64_t node_idx = _buckets[i]._node_idx;
Node* node = _buckets[i]._node;
bool first = true;

if (skip_empty && node_idx == -1) {
if (skip_empty && node == nullptr) {
continue;
}

ss << i << ": ";

while (node_idx != -1) {
Node* node = get_node(node_idx);

while (node != nullptr) {
if (!first) {
ss << ",";
}

if (desc == NULL) {
ss << node_idx << "(" << (void*)node->data() << ")";
ss << node->_hash << "(" << (void*)node->data() << ")";
} else {
ss << (void*)node->data() << " " << node->data()->to_string(*desc);
}

node_idx = node->_next_idx;
node = node->_next;
first = false;
}

Expand Down
72 changes: 33 additions & 39 deletions be/src/exec/hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "codegen/doris_ir.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "util/hash_util.hpp"

namespace doris {
Expand Down Expand Up @@ -142,7 +143,7 @@ class HashTable {

// Returns the number of bytes allocated to the hash table
int64_t byte_size() const {
return _node_byte_size * _nodes_capacity + sizeof(Bucket) * _buckets.size();
return _node_byte_size * _total_capacity + sizeof(Bucket) * _buckets.size();
}

// Returns the results of the exprs at 'expr_idx' evaluated over the last row
Expand Down Expand Up @@ -172,7 +173,7 @@ class HashTable {
// stl-like iterator interface.
class Iterator {
public:
Iterator() : _table(NULL), _bucket_idx(-1), _node_idx(-1) {}
Iterator() : _table(NULL), _bucket_idx(-1), _node(nullptr) {}

// Iterates to the next element. In the case where the iterator was
// from a Find, this will lazily evaluate that bucket, only returning
Expand All @@ -182,54 +183,52 @@ class HashTable {

// Returns the current row or NULL if at end.
TupleRow* get_row() {
if (_node_idx == -1) {
if (_node == nullptr) {
return NULL;
}
return _table->get_node(_node_idx)->data();
return _node->data();
}

// Returns Hash
uint32_t get_hash() { return _table->get_node(_node_idx)->_hash; }
uint32_t get_hash() { return _node->_hash; }

// Returns if the iterator is at the end
bool has_next() { return _node_idx != -1; }
bool has_next() { return _node != nullptr; }

// Returns true if this iterator is at the end, i.e. get_row() cannot be called.
bool at_end() { return _node_idx == -1; }
bool at_end() { return _node == nullptr; }

// Sets as matched the node currently pointed by the iterator. The iterator
// cannot be AtEnd().
void set_matched() {
DCHECK(!at_end());
Node* node = _table->get_node(_node_idx);
node->matched = true;
_node->matched = true;
}

bool matched() {
DCHECK(!at_end());
Node* node = _table->get_node(_node_idx);
return node->matched;
return _node->matched;
}

bool operator==(const Iterator& rhs) {
return _bucket_idx == rhs._bucket_idx && _node_idx == rhs._node_idx;
return _bucket_idx == rhs._bucket_idx && _node == rhs._node;
}

bool operator!=(const Iterator& rhs) {
return _bucket_idx != rhs._bucket_idx || _node_idx != rhs._node_idx;
return _bucket_idx != rhs._bucket_idx || _node != rhs._node;
}

private:
friend class HashTable;

Iterator(HashTable* table, int bucket_idx, int64_t node, uint32_t hash)
: _table(table), _bucket_idx(bucket_idx), _node_idx(node), _scan_hash(hash) {}
Iterator(HashTable* table, int bucket_idx, Node* node, uint32_t hash)
: _table(table), _bucket_idx(bucket_idx), _node(node), _scan_hash(hash) {}

HashTable* _table;
// Current bucket idx
int64_t _bucket_idx;
// Current node idx (within current bucket)
int64_t _node_idx;
// Current node (within current bucket)
Node* _node;
// cached hash value for the row passed to find()()
uint32_t _scan_hash;
};
Expand All @@ -241,11 +240,11 @@ class HashTable {
// Header portion of a Node. The node data (TupleRow) is right after the
// node memory to maximize cache hits.
struct Node {
int64_t _next_idx; // chain to next node for collisions
uint32_t _hash; // Cache of the hash for _data
Node* _next; // chain to next node for collisions
uint32_t _hash; // Cache of the hash for _data
bool matched;

Node() : _next_idx(-1), _hash(-1), matched(false) {}
Node() : _next(nullptr), _hash(-1), matched(false) {}

TupleRow* data() {
uint8_t* mem = reinterpret_cast<uint8_t*>(this);
Expand All @@ -255,22 +254,14 @@ class HashTable {
};

struct Bucket {
int64_t _node_idx;

Bucket() { _node_idx = -1; }
Bucket() { _node = nullptr; }
Node* _node;
};

// Returns the next non-empty bucket and updates idx to be the index of that bucket.
// If there are no more buckets, returns NULL and sets idx to -1
Bucket* next_bucket(int64_t* bucket_idx);

// Returns node at idx. Tracking structures do not use pointers since they will
// change as the HashTable grows.
Node* get_node(int64_t idx) {
DCHECK_NE(idx, -1);
return reinterpret_cast<Node*>(_nodes + _node_byte_size * idx);
}

// Resize the hash table to 'num_buckets'
void resize_buckets(int64_t num_buckets);

Expand All @@ -279,12 +270,11 @@ class HashTable {

// Chains the node at 'node_idx' to 'bucket'. Nodes in a bucket are chained
// as a linked list; this places the new node at the beginning of the list.
void add_to_bucket(Bucket* bucket, int64_t node_idx, Node* node);
void add_to_bucket(Bucket* bucket, Node* node);

// Moves a node from one bucket to another. 'previous_node' refers to the
// node (if any) that's chained before this node in from_bucket's linked list.
void move_node(Bucket* from_bucket, Bucket* to_bucket, int64_t node_idx, Node* node,
Node* previous_node);
void move_node(Bucket* from_bucket, Bucket* to_bucket, Node* node, Node* previous_node);

// Evaluate the exprs over row and cache the results in '_expr_values_buffer'.
// Returns whether any expr evaluated to NULL
Expand Down Expand Up @@ -354,14 +344,16 @@ class HashTable {
const int _node_byte_size;
// Number of non-empty buckets. Used to determine when to grow and rehash
int64_t _num_filled_buckets;
// Memory to store node data. This is not allocated from a pool to take advantage
// of realloc.
// TODO: integrate with mem pools
uint8_t* _nodes;
// Buffer to store node data.
uint8_t* _current_nodes;
// number of nodes stored (i.e. size of hash table)
int64_t _num_nodes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int64_t _num_nodes;
int64_t _total_used;

Easier to understand

// max number of nodes that can be stored in '_nodes' before realloc
int64_t _nodes_capacity;
// current nodes buffer capacity
int64_t _current_capacity;
// current used
int64_t _current_used;
// total capacity
int64_t _total_capacity;

bool _exceeded_limit; // true if any of _mem_trackers[].limit_exceeded()

Expand Down Expand Up @@ -395,6 +387,8 @@ class HashTable {
// Use bytes instead of bools to be compatible with llvm. This address must
// not change once allocated.
uint8_t* _expr_value_null_bits;
// node buffer list
std::vector<uint8_t*> _alloc_list;
};

} // namespace doris
Expand Down
Loading