diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 7591058e1c2242..82459a855e5668 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -111,10 +111,15 @@ class DataStreamSender::Channel { // if batch is nullptr, send the eof packet Status send_batch(PRowBatch* batch, bool eos = false); - // Flush buffered rows and close channel. - // Returns error status if any of the preceding rpcs failed, OK otherwise. + // Flush buffered rows and close channel. This function don't wait the response + // of close operation, client should call close_wait() to finish channel's close. + // We split one close operation into two phases in order to make multiple channels + // can run parallel. void close(RuntimeState* state); + // Get close wait's response, to finish channel close operation. + void close_wait(RuntimeState* state); + int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; } @@ -283,13 +288,19 @@ Status DataStreamSender::Channel::close_internal() { } else { RETURN_IF_ERROR(send_batch(nullptr, true)); } - RETURN_IF_ERROR(_wait_last_brpc()); - _need_close = false; + // Don't wait for the last packet to finish, left it to close_wait. return Status::OK(); } void DataStreamSender::Channel::close(RuntimeState* state) { state->log_error(close_internal().get_error_msg()); +} + +void DataStreamSender::Channel::close_wait(RuntimeState* state) { + if (_need_close) { + state->log_error(_wait_last_brpc().get_error_msg()); + _need_close = false; + } _batch.reset(); } @@ -591,9 +602,14 @@ Status DataStreamSender::compute_range_part_code( Status DataStreamSender::close(RuntimeState* state, Status exec_status) { // TODO: only close channels that didn't have any errors + // make all channels close parallel for (int i = 0; i < _channels.size(); ++i) { _channels[i]->close(state); } + // wait all channels to finish + for (int i = 0; i < _channels.size(); ++i) { + _channels[i]->close_wait(state); + } for (auto iter : _partition_infos) { RETURN_IF_ERROR(iter->close(state)); }