Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 180 additions & 76 deletions be/src/pipeline/exec/exchange_sink_buffer.cpp

Large diffs are not rendered by default.

15 changes: 9 additions & 6 deletions be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,11 @@ class BroadcastPBlockHolderMemLimiter

namespace pipeline {
struct TransmitInfo {
vectorized::Channel* channel = nullptr;
std::unique_ptr<PBlock> block;
bool eos;
};

struct BroadcastTransmitInfo {
vectorized::Channel* channel = nullptr;
std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
bool eos;
};
Expand All @@ -144,10 +142,13 @@ struct RpcInstance {
int64_t seq = 0;

// Queue for regular data transmission requests
std::queue<TransmitInfo, std::list<TransmitInfo>> package_queue;
std::unordered_map<vectorized::Channel*, std::queue<TransmitInfo, std::list<TransmitInfo>>>
package_queue;

// Queue for broadcast data transmission requests
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> broadcast_package_queue;
std::unordered_map<vectorized::Channel*,
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>>
broadcast_package_queue;

// RPC request parameters for data transmission
std::shared_ptr<PTransmitDataParams> request;
Expand Down Expand Up @@ -275,8 +276,8 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {

void construct_request(TUniqueId);

Status add_block(TransmitInfo&& request);
Status add_block(BroadcastTransmitInfo&& request);
Status add_block(vectorized::Channel* channel, TransmitInfo&& request);
Status add_block(vectorized::Channel* channel, BroadcastTransmitInfo&& request);
void close();
void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t receive_rpc_time);
void update_profile(RuntimeProfile* profile);
Expand Down Expand Up @@ -341,6 +342,8 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
// The ExchangeSinkLocalState in _parents is only used in _turn_off_channel.
std::vector<ExchangeSinkLocalState*> _parents;
const int64_t _exchange_sink_num;
bool _send_multi_blocks = false;
int _send_multi_blocks_byte_size = 256 * 1024;
};

} // namespace pipeline
Expand Down
12 changes: 12 additions & 0 deletions be/src/vec/runtime/vdata_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
}

bool eos = request->eos();
if (!request->blocks().empty()) {
for (int i = 0; i < request->blocks_size(); i++) {
std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
pblock_ptr->Swap(const_cast<PBlock*>(&request->blocks(i)));
RETURN_IF_ERROR(recvr->add_block(
std::move(pblock_ptr), request->sender_id(), request->be_number(),
request->packet_seq() - request->blocks_size() + i, eos ? nullptr : done,
wait_for_worker, cpu_time_stop_watch.elapsed_time()));
}
}

// old logic, for compatibility
if (request->has_block()) {
std::unique_ptr<PBlock> pblock_ptr {
const_cast<PTransmitDataParams*>(request)->release_block()};
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Status Channel::send_remote_block(std::unique_ptr<PBlock>&& block, bool eos) {
}
}
if (eos || block->column_metas_size()) {
RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos}));
RETURN_IF_ERROR(_buffer->add_block(this, {std::move(block), eos}));
}
return Status::OK();
}
Expand All @@ -188,7 +188,7 @@ Status Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& blo
_eos_send = true;
}
if (eos || block->get_block()->column_metas_size()) {
RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
RETURN_IF_ERROR(_buffer->add_block(this, {block, eos}));
}
return Status::OK();
}
Expand Down
5 changes: 2 additions & 3 deletions be/test/vec/exec/exchange_sink_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,8 @@ struct SinkWithChannel {
std::map<int64_t, std::shared_ptr<Channel>> channels;
Status add_block(int64_t id, bool eos) {
auto channel = channels[id];
TransmitInfo transmitInfo {
.channel = channel.get(), .block = std::make_unique<PBlock>(), .eos = eos};
return buffer->add_block(std::move(transmitInfo));
TransmitInfo transmitInfo {.block = std::make_unique<PBlock>(), .eos = eos};
return buffer->add_block(channel.get(), std::move(transmitInfo));
}
};

Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_ES_PARALLEL_SCROLL = "enable_es_parallel_scroll";

public static final String EXCHANGE_MULTI_BLOCKS_BYTE_SIZE = "exchange_multi_blocks_byte_size";

public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
Expand Down Expand Up @@ -2243,6 +2245,10 @@ public boolean isEnableHboNonStrictMatchingMode() {
"When processing both \\n and \\r\\n as CSV line separators, should \\r be retained?"})
public boolean keepCarriageReturn = false;

@VariableMgr.VarAttr(name = EXCHANGE_MULTI_BLOCKS_BYTE_SIZE,
description = {"Enable exchange to send multiple blocks in one RPC. Default is 256KB. A negative"
+ " value disables multi-block exchange."})
public int exchangeMultiBlocksByteSize = 256 * 1024;

@VariableMgr.VarAttr(name = FORCE_JNI_SCANNER,
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"})
Expand Down Expand Up @@ -2591,6 +2597,12 @@ public void initFuzzyModeVariables() {
this.disableStreamPreaggregations = random.nextBoolean();
this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
this.enableParallelResultSink = random.nextBoolean();

// 4KB = 4 * 1024 bytes
int minBytes = 4 * 1024;
// 10MB = 10 * 1024 * 1024 bytes
int maxBytes = 10 * 1024 * 1024;
this.exchangeMultiBlocksByteSize = minBytes + (int) (random.nextDouble() * (maxBytes - minBytes));
int randomInt = random.nextInt(4);
if (randomInt % 2 == 0) {
this.rewriteOrToInPredicateThreshold = 100000;
Expand Down Expand Up @@ -4165,6 +4177,7 @@ public TQueryOptions toThrift() {
tResult.setEnableRuntimeFilterPartitionPrune(enableRuntimeFilterPartitionPrune);

tResult.setMinimumOperatorMemoryRequiredKb(minimumOperatorMemoryRequiredKB);
tResult.setExchangeMultiBlocksByteSize(exchangeMultiBlocksByteSize);
return tResult;
}

Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/internal_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message PTransmitDataParams {
optional bool transfer_by_attachment = 10 [default = false];
optional PUniqueId query_id = 11;
optional PStatus exec_status = 12;
repeated PBlock blocks = 13;
};

message PTransmitDataResult {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ struct TQueryOptions {
162: optional bool dump_heap_profile_when_mem_limit_exceeded = false
163: optional bool inverted_index_compatible_read = false
164: optional bool check_orc_init_sargs_success = false
165: optional i32 exchange_multi_blocks_byte_size = 262144

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
Expand Down
Loading