diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 587b1d65b970b8..3efea5d8c6a705 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -55,7 +55,8 @@ Overload(Callables&&... callables) -> Overload; template struct ProcessHashTableBuild { ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, - HashJoinNode* join_node, int batch_size, uint8_t offset) + HashJoinNode* join_node, int batch_size, uint8_t offset, + RuntimeState* state) : _rows(rows), _skip_rows(0), _acquired_block(acquired_block), @@ -63,6 +64,7 @@ struct ProcessHashTableBuild { _join_node(join_node), _batch_size(batch_size), _offset(offset), + _state(state), _build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer) {} template @@ -121,6 +123,9 @@ struct ProcessHashTableBuild { } for (size_t k = 0; k < _rows; ++k) { + if (k % 65536 == 0) { + RETURN_IF_CANCELLED(_state); + } if constexpr (ignore_null) { if ((*null_map)[k]) { continue; @@ -148,6 +153,9 @@ struct ProcessHashTableBuild { bool build_unique = _join_node->_build_unique; #define EMPLACE_IMPL(stmt) \ for (size_t k = 0; k < _rows; ++k) { \ + if (k % 65536 == 0) { \ + RETURN_IF_CANCELLED(_state); \ + } \ if constexpr (ignore_null) { \ if ((*null_map)[k]) { \ continue; \ @@ -208,6 +216,7 @@ struct ProcessHashTableBuild { HashJoinNode* _join_node; int _batch_size; uint8_t _offset; + RuntimeState* _state; ProfileCounter* _build_side_compute_hash_timer; std::vector _build_side_hash_values; @@ -911,7 +920,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin auto short_circuit_for_null_in_build_side) -> Status { using HashTableCtxType = std::decay_t; ProcessHashTableBuild hash_table_build_process( - rows, block, raw_ptrs, this, state->batch_size(), offset); + rows, block, raw_ptrs, this, state->batch_size(), offset, state); return hash_table_build_process .template run( arg, diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 8ce115813cad1c..418aa06d8575d1 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -26,12 +26,14 @@ namespace vectorized { template struct HashTableBuild { HashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs& build_raw_ptrs, - VSetOperationNode* operation_node, uint8_t offset) + VSetOperationNode* operation_node, uint8_t offset, + RuntimeState* state) : _rows(rows), _offset(offset), _acquired_block(acquired_block), _build_raw_ptrs(build_raw_ptrs), - _operation_node(operation_node) {} + _operation_node(operation_node), + _state(state) {} Status operator()(HashTableContext& hash_table_ctx) { using KeyGetter = typename HashTableContext::State; @@ -51,6 +53,9 @@ struct HashTableBuild { } for (size_t k = 0; k < _rows; ++k) { + if (k % 65536 == 0) { + RETURN_IF_CANCELLED(_state); + } auto emplace_result = key_getter.emplace_key(hash_table_ctx.hash_table, k, *(_operation_node->_arena)); @@ -72,6 +77,7 @@ struct HashTableBuild { Block& _acquired_block; ColumnRawPtrs& _build_raw_ptrs; VSetOperationNode* _operation_node; + RuntimeState* _state; }; VSetOperationNode::VSetOperationNode(ObjectPool* pool, const TPlanNode& tnode, @@ -271,7 +277,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { _build_blocks.emplace_back(mutable_block.to_block()); // TODO:: Rethink may we should do the proess after we recevie all build blocks ? // which is better. - RETURN_IF_ERROR(process_build_block(_build_blocks[index], index)); + RETURN_IF_ERROR(process_build_block(_build_blocks[index], index, state)); mutable_block = MutableBlock(); ++index; last_mem_used = _mem_used; @@ -280,11 +286,11 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { _build_blocks.emplace_back(mutable_block.to_block()); child(0)->close(state); - RETURN_IF_ERROR(process_build_block(_build_blocks[index], index)); + RETURN_IF_ERROR(process_build_block(_build_blocks[index], index, state)); return Status::OK(); } -Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) { +Status VSetOperationNode::process_build_block(Block& block, uint8_t offset, RuntimeState* state) { size_t rows = block.rows(); if (rows == 0) { return Status::OK(); @@ -299,7 +305,7 @@ Status VSetOperationNode::process_build_block(Block& block, uint8_t offset) { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { HashTableBuild hash_table_build_process(rows, block, raw_ptrs, - this, offset); + this, offset, state); hash_table_build_process(arg); } else { LOG(FATAL) << "FATAL: uninited hash table"; diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index 902f19efa4720f..d21231bf6f9320 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -49,7 +49,7 @@ class VSetOperationNode : public ExecNode { //It's time to abstract out the same methods and provide them directly to others; void hash_table_init(); Status hash_table_build(RuntimeState* state); - Status process_build_block(Block& block, uint8_t offset); + Status process_build_block(Block& block, uint8_t offset, RuntimeState* state); Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs); Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int child_id); template