Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ Overload(Callables&&... callables) -> Overload<Callables...>;
template <class HashTableContext>
struct ProcessHashTableBuild {
ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs,
HashJoinNode* join_node, int batch_size, uint8_t offset)
HashJoinNode* join_node, int batch_size, uint8_t offset,
RuntimeState* state)
: _rows(rows),
_skip_rows(0),
_acquired_block(acquired_block),
_build_raw_ptrs(build_raw_ptrs),
_join_node(join_node),
_batch_size(batch_size),
_offset(offset),
_state(state),
_build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer) {}

template <bool ignore_null, bool short_circuit_for_null>
Expand Down Expand Up @@ -121,6 +123,9 @@ struct ProcessHashTableBuild {
}

for (size_t k = 0; k < _rows; ++k) {
if (k % 65536 == 0) {
RETURN_IF_CANCELLED(_state);
}
if constexpr (ignore_null) {
if ((*null_map)[k]) {
continue;
Expand Down Expand Up @@ -148,6 +153,9 @@ struct ProcessHashTableBuild {
bool build_unique = _join_node->_build_unique;
#define EMPLACE_IMPL(stmt) \
for (size_t k = 0; k < _rows; ++k) { \
if (k % 65536 == 0) { \
RETURN_IF_CANCELLED(_state); \
} \
if constexpr (ignore_null) { \
if ((*null_map)[k]) { \
continue; \
Expand Down Expand Up @@ -208,6 +216,7 @@ struct ProcessHashTableBuild {
HashJoinNode* _join_node;
int _batch_size;
uint8_t _offset;
RuntimeState* _state;

ProfileCounter* _build_side_compute_hash_timer;
std::vector<size_t> _build_side_hash_values;
Expand Down Expand Up @@ -911,7 +920,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
auto short_circuit_for_null_in_build_side) -> Status {
using HashTableCtxType = std::decay_t<decltype(arg)>;
ProcessHashTableBuild<HashTableCtxType> hash_table_build_process(
rows, block, raw_ptrs, this, state->batch_size(), offset);
rows, block, raw_ptrs, this, state->batch_size(), offset, state);
return hash_table_build_process
.template run<has_null_value, short_circuit_for_null_in_build_side>(
arg,
Expand Down
18 changes: 12 additions & 6 deletions be/src/vec/exec/vset_operation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ namespace vectorized {
template <class HashTableContext>
struct HashTableBuild {
HashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs,
VSetOperationNode* operation_node, uint8_t offset)
VSetOperationNode* operation_node, uint8_t offset,
RuntimeState* state)
: _rows(rows),
_offset(offset),
_acquired_block(acquired_block),
_build_raw_ptrs(build_raw_ptrs),
_operation_node(operation_node) {}
_operation_node(operation_node),
_state(state) {}

Status operator()(HashTableContext& hash_table_ctx) {
using KeyGetter = typename HashTableContext::State;
Expand All @@ -51,6 +53,9 @@ struct HashTableBuild {
}

for (size_t k = 0; k < _rows; ++k) {
if (k % 65536 == 0) {
RETURN_IF_CANCELLED(_state);
}
auto emplace_result = key_getter.emplace_key(hash_table_ctx.hash_table, k,
*(_operation_node->_arena));

Expand All @@ -72,6 +77,7 @@ struct HashTableBuild {
Block& _acquired_block;
ColumnRawPtrs& _build_raw_ptrs;
VSetOperationNode* _operation_node;
RuntimeState* _state;
};

VSetOperationNode::VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
Expand Down Expand Up @@ -271,7 +277,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) {
_build_blocks.emplace_back(mutable_block.to_block());
// TODO:: Rethink may we should do the proess after we recevie all build blocks ?
// which is better.
RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
RETURN_IF_ERROR(process_build_block(_build_blocks[index], index, state));
mutable_block = MutableBlock();
++index;
last_mem_used = _mem_used;
Expand All @@ -280,11 +286,11 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) {

_build_blocks.emplace_back(mutable_block.to_block());
child(0)->close(state);
RETURN_IF_ERROR(process_build_block(_build_blocks[index], index));
RETURN_IF_ERROR(process_build_block(_build_blocks[index], index, state));
return Status::OK();
}

Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) {
Status VSetOperationNode::process_build_block(Block& block, uint8_t offset, RuntimeState* state) {
size_t rows = block.rows();
if (rows == 0) {
return Status::OK();
Expand All @@ -299,7 +305,7 @@ Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
HashTableBuild<HashTableCtxType> hash_table_build_process(rows, block, raw_ptrs,
this, offset);
this, offset, state);
hash_table_build_process(arg);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/vset_operation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class VSetOperationNode : public ExecNode {
//It's time to abstract out the same methods and provide them directly to others;
void hash_table_init();
Status hash_table_build(RuntimeState* state);
Status process_build_block(Block& block, uint8_t offset);
Status process_build_block(Block& block, uint8_t offset, RuntimeState* state);
Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs);
Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int child_id);
template <bool keep_matched>
Expand Down