Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
}

std::vector<uint32_t> partition_indexes[_partition_count];
auto* channel_ids = reinterpret_cast<uint64_t*>(local_state._partitioner->get_channel_ids());
auto* channel_ids = reinterpret_cast<uint32_t*>(local_state._partitioner->get_channel_ids());
for (uint32_t i = 0; i != rows; ++i) {
partition_indexes[channel_ids[i]].emplace_back(i);
}
Expand Down Expand Up @@ -862,6 +862,7 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
RETURN_IF_ERROR(
_inner_probe_operator->pull(local_state._runtime_state.get(), block, eos));
if (*eos) {
_update_profile_from_internal_states(local_state);
local_state._runtime_state.reset();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RuntimeState;

namespace pipeline {

using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
using PartitionerType = vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;

class PartitionedHashJoinProbeOperatorX;

Expand Down
166 changes: 129 additions & 37 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,21 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,

Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
for (uint32_t i = 0; i != p._partition_count; ++i) {
auto& spilling_stream = _shared_state->spilled_streams[i];
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, spilling_stream, print_id(state->query_id()),
fmt::format("hash_build_sink_{}", i), _parent->id(),
std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), _profile));
RETURN_IF_ERROR(spilling_stream->prepare_spill());
spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count,
_spill_data_size, _spill_write_disk_timer,
_spill_write_wait_io_timer);
}
return _partitioner->open(state);
}

Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(PipelineXSpillSinkLocalState::exec_time_counter());
SCOPED_TIMER(PipelineXSpillSinkLocalState::_close_timer);
Expand Down Expand Up @@ -87,39 +100,127 @@ size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
return mem_size;
}

Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* state) {
_shared_state->need_to_spill = true;
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child_x->row_desc();
auto build_block = std::move(_shared_state->inner_shared_state->build_block);
if (!build_block) {
build_block = vectorized::Block::create_shared();
auto inner_sink_state = _shared_state->inner_runtime_state->get_sink_local_state();
if (inner_sink_state) {
auto& mutable_block = reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
->_build_side_mutable_block;
*build_block = mutable_block.to_block();
LOG(INFO) << "hash join sink will revoke build mutable block: "
<< build_block->allocated_bytes();
}
}

/// Here need to skip the first row in build block.
/// The first row in build block is generated by `HashJoinBuildSinkOperatorX::sink`.
if (build_block->rows() <= 1) {
return Status::OK();
}

if (build_block->columns() > row_desc.num_slots()) {
build_block->erase(row_desc.num_slots());
}

{
/// TODO: DO NOT execute build exprs twice(when partition and building hash table)
SCOPED_TIMER(_partition_timer);
RETURN_IF_ERROR(
_partitioner->do_partitioning(state, build_block.get(), _mem_tracker.get()));
}

auto execution_context = state->get_task_execution_context();
_dependency->block();
auto spill_func = [execution_context, build_block, state, this]() {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was cancelled.";
return;
}
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
SCOPED_TIMER(_partition_shuffle_timer);
auto* channel_ids = reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids());

auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
std::vector<uint32_t> partition_indices;
const auto reserved_size = 4096;
partition_indices.reserve(reserved_size);

auto flush_rows = [&partition_indices, &build_block, &state, this](
std::unique_ptr<vectorized::MutableBlock>& partition_block,
vectorized::SpillStreamSPtr& spilling_stream) {
auto* begin = &(partition_indices[0]);
const auto count = partition_indices.size();
if (!partition_block) {
partition_block =
vectorized::MutableBlock::create_unique(build_block->clone_empty());
}
partition_block->add_rows(build_block.get(), begin, begin + count);
partition_indices.clear();

if (partition_block->allocated_bytes() >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
auto block = partition_block->to_block();
partition_block =
vectorized::MutableBlock::create_unique(build_block->clone_empty());
auto status = spilling_stream->spill_block(state, block, false);

if (!status.ok()) {
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status = status;
_spill_status_ok = false;
_dependency->set_ready();
return false;
}
}
return true;
};

for (uint32_t i = 0; i != p._partition_count; ++i) {
vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i];
DCHECK(spilling_stream != nullptr);

const auto rows = build_block->rows();
for (size_t idx = 1; idx != rows; ++idx) {
if (channel_ids[idx] == i) {
partition_indices.emplace_back(idx);
} else {
continue;
}

const auto count = partition_indices.size();
if (UNLIKELY(count >= reserved_size)) {
if (!flush_rows(partitioned_blocks[i], spilling_stream)) {
break;
}
}
}

if (!partition_indices.empty()) {
flush_rows(partitioned_blocks[i], spilling_stream);
}
}

_dependency->set_ready();
};
auto* thread_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
return thread_pool->submit_func(spill_func);
}

Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
LOG(INFO) << "hash join sink " << _parent->id() << " revoke_memory"
<< ", eos: " << _child_eos;
DCHECK_EQ(_spilling_streams_count, 0);

if (!_shared_state->need_to_spill) {
profile()->add_info_string("Spilled", "true");
_shared_state->need_to_spill = true;
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child_x->row_desc();
auto build_block = std::move(_shared_state->inner_shared_state->build_block);
if (!build_block) {
build_block = vectorized::Block::create_shared();
auto inner_sink_state = _shared_state->inner_runtime_state->get_sink_local_state();
if (inner_sink_state) {
auto& mutable_block =
reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
->_build_side_mutable_block;
*build_block = mutable_block.to_block();
LOG(INFO) << "hash join sink will revoke build mutable block: "
<< build_block->allocated_bytes();
}
}

/// Here need to skip the first row in build block.
/// The first row in build block is generated by `HashJoinBuildSinkOperatorX::sink`.
if (build_block->rows() > 1) {
if (build_block->columns() > row_desc.num_slots()) {
build_block->erase(row_desc.num_slots());
}
RETURN_IF_ERROR(_partition_block(state, build_block.get(), 1, build_block->rows()));
}
return _revoke_unpartitioned_block(state);
}

_spilling_streams_count = _shared_state->partitioned_build_blocks.size();
Expand All @@ -133,16 +234,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
continue;
}

if (!spilling_stream) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, spilling_stream, print_id(state->query_id()), "hash_build_sink",
_parent->id(), std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), _profile));
RETURN_IF_ERROR(spilling_stream->prepare_spill());
spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count,
_spill_data_size, _spill_write_disk_timer,
_spill_write_wait_io_timer);
}
DCHECK(spilling_stream != nullptr);

auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
Expand Down Expand Up @@ -201,7 +293,7 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,

auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
SCOPED_TIMER(_partition_shuffle_timer);
auto* channel_ids = reinterpret_cast<uint64_t*>(_partitioner->get_channel_ids());
auto* channel_ids = reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids());
std::vector<uint32_t> partition_indexes[p._partition_count];
DCHECK_LT(begin, end);
for (size_t i = begin; i != end; ++i) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class RuntimeState;

namespace pipeline {

using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
using PartitionerType = vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;

class PartitionedHashJoinSinkOperatorX;

Expand All @@ -60,6 +60,8 @@ class PartitionedHashJoinSinkLocalState
Status _partition_block(RuntimeState* state, vectorized::Block* in_block, size_t begin,
size_t end);

Status _revoke_unpartitioned_block(RuntimeState* state);

friend class PartitionedHashJoinSinkOperatorX;

std::atomic_int _spilling_streams_count {0};
Expand Down