From c97ca372dca90998424ad47da399b4702adb14e2 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 24 May 2023 10:15:16 +0800 Subject: [PATCH] 1 --- be/src/vec/exec/join/vhash_join_node.cpp | 13 +++++++++++-- be/src/vec/exec/vset_operation_node.cpp | 20 ++++++++++++++------ be/src/vec/exec/vset_operation_node.h | 2 +- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index fee17c952dfc76..72b5a6df350424 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -96,7 +96,8 @@ Overload(Callables&&... callables) -> Overload; template struct ProcessHashTableBuild { ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, - HashJoinNode* join_node, int batch_size, uint8_t offset) + HashJoinNode* join_node, int batch_size, uint8_t offset, + RuntimeState* state) : _rows(rows), _skip_rows(0), _acquired_block(acquired_block), @@ -104,6 +105,7 @@ struct ProcessHashTableBuild { _join_node(join_node), _batch_size(batch_size), _offset(offset), + _state(state), _build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer) {} template @@ -170,6 +172,9 @@ struct ProcessHashTableBuild { } for (size_t k = 0; k < _rows; ++k) { + if (k % 65536 == 0) { + RETURN_IF_CANCELLED(_state); + } if constexpr (ignore_null) { if ((*null_map)[k]) { continue; @@ -198,6 +203,9 @@ struct ProcessHashTableBuild { bool build_unique = _join_node->_build_unique; #define EMPLACE_IMPL(stmt) \ for (size_t k = 0; k < _rows; ++k) { \ + if (k % 65536 == 0) { \ + RETURN_IF_CANCELLED(_state); \ + } \ if constexpr (ignore_null) { \ if ((*null_map)[k]) { \ continue; \ @@ -262,6 +270,7 @@ struct ProcessHashTableBuild { HashJoinNode* _join_node; int _batch_size; uint8_t _offset; + RuntimeState* _state; ProfileCounter* _build_side_compute_hash_timer; std::vector _build_side_hash_values; @@ -1064,7 +1073,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin auto short_circuit_for_null_in_build_side) -> Status { using HashTableCtxType = std::decay_t; ProcessHashTableBuild hash_table_build_process( - rows, block, raw_ptrs, this, state->batch_size(), offset); + rows, block, raw_ptrs, this, state->batch_size(), offset, state); return hash_table_build_process .template run( arg, diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 4e83ffb7e8737e..d18ee55931a397 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -57,11 +57,13 @@ namespace vectorized { template struct HashTableBuild { HashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs, - VSetOperationNode* operation_node, uint8_t offset) + VSetOperationNode* operation_node, uint8_t offset, + RuntimeState* state) : _rows(rows), _offset(offset), _build_raw_ptrs(build_raw_ptrs), - _operation_node(operation_node) {} + _operation_node(operation_node), + _state(state) {} Status operator()(HashTableContext& hash_table_ctx) { using KeyGetter = typename HashTableContext::State; @@ -81,6 +83,9 @@ struct HashTableBuild { } for (size_t k = 0; k < _rows; ++k) { + if (k % 65536 == 0) { + RETURN_IF_CANCELLED(_state); + } auto emplace_result = key_getter.emplace_key(hash_table_ctx.hash_table, k, *(_operation_node->_arena)); @@ -100,6 +105,7 @@ struct HashTableBuild { const uint8_t _offset; ColumnRawPtrs& _build_raw_ptrs; VSetOperationNode* _operation_node; + RuntimeState* _state; }; template @@ -375,7 +381,7 @@ void VSetOperationNode::hash_table_init() { } template -Status VSetOperationNode::sink(RuntimeState*, Block* block, bool eos) { +Status VSetOperationNode::sink(RuntimeState* state, Block* block, bool eos) { constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL; if (block->rows() != 0) { @@ -385,7 +391,8 @@ Status VSetOperationNode::sink(RuntimeState*, Block* block, bool e if (eos || _mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) { _build_blocks.emplace_back(_mutable_block.to_block()); - RETURN_IF_ERROR(process_build_block(_build_blocks[_build_block_index], _build_block_index)); + RETURN_IF_ERROR( + process_build_block(_build_blocks[_build_block_index], _build_block_index, state)); _mutable_block.clear(); ++_build_block_index; @@ -456,7 +463,8 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { } template -Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) { +Status VSetOperationNode::process_build_block(Block& block, uint8_t offset, + RuntimeState* state) { size_t rows = block.rows(); if (rows == 0) { return Status::OK(); @@ -471,7 +479,7 @@ Status VSetOperationNode::process_build_block(Block& block, uint8_ using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { HashTableBuild hash_table_build_process( - rows, raw_ptrs, this, offset); + rows, raw_ptrs, this, offset, state); hash_table_build_process(arg); } else { LOG(FATAL) << "FATAL: uninited hash table"; diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index b833658c7d6206..f4c9e0c87031ac 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -79,7 +79,7 @@ class VSetOperationNode final : public ExecNode { //It's time to abstract out the same methods and provide them directly to others; void hash_table_init(); Status hash_table_build(RuntimeState* state); - Status process_build_block(Block& block, uint8_t offset); + Status process_build_block(Block& block, uint8_t offset, RuntimeState* state); Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs); Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int child_id); void refresh_hash_table();