Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "runtime/mysql_table_sink.h"
#include "runtime/odbc_table_sink.h"
#include "runtime/result_sink.h"
#include "runtime/result_file_sink.h"
#include "runtime/runtime_state.h"
#include "util/logging.h"

Expand All @@ -43,7 +44,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
const TPlanFragmentExecParams& params,
const RowDescriptor& row_desc,
bool is_vec,
boost::scoped_ptr<DataSink>* sink) {
boost::scoped_ptr<DataSink>* sink,
DescriptorTbl& desc_tbl) {
DataSink* tmp_sink = NULL;

switch (thrift_sink.type) {
Expand All @@ -66,7 +68,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
sink->reset(tmp_sink);
break;
}
case TDataSinkType::RESULT_SINK:
case TDataSinkType::RESULT_SINK: {
if (!thrift_sink.__isset.result_sink) {
return Status::InternalError("Missing data buffer sink.");
}
Expand All @@ -78,14 +80,30 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
}
sink->reset(tmp_sink);
break;
case TDataSinkType::MEMORY_SCRATCH_SINK:
}
case TDataSinkType::RESULT_FILE_SINK: {
if (!thrift_sink.__isset.result_file_sink) {
return Status::InternalError("Missing result file sink.");
}
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink,
params.destinations, pool, params.sender_id, desc_tbl);
} else {
tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink);
}
sink->reset(tmp_sink);
break;
}
case TDataSinkType::MEMORY_SCRATCH_SINK: {
if (!thrift_sink.__isset.memory_scratch_sink) {
return Status::InternalError("Missing data buffer sink.");
}

tmp_sink = new MemoryScratchSink(row_desc, output_exprs, thrift_sink.memory_scratch_sink);
sink->reset(tmp_sink);
break;
}
case TDataSinkType::MYSQL_TABLE_SINK: {
#ifdef DORIS_WITH_MYSQL
if (!thrift_sink.__isset.mysql_table_sink) {
Expand Down Expand Up @@ -138,7 +156,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
break;
}

default:
default: {
std::stringstream error_msg;
std::map<int, const char*>::const_iterator i =
_TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
Expand All @@ -151,6 +169,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
error_msg << str << " not implemented.";
return Status::InternalError(error_msg.str());
}
}

if (sink->get() != NULL) {
RETURN_IF_ERROR((*sink)->init(thrift_sink));
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "common/status.h"
#include "gen_cpp/DataSinks_types.h"
#include "gen_cpp/Exprs_types.h"
#include "runtime/descriptors.h"
#include "runtime/mem_tracker.h"
#include "runtime/query_statistics.h"

Expand Down Expand Up @@ -75,7 +76,8 @@ class DataSink {
const TPlanFragmentExecParams& params,
const RowDescriptor& row_desc,
bool is_vec,
boost::scoped_ptr<DataSink>* sink);
boost::scoped_ptr<DataSink>* sink,
DescriptorTbl& desc_tbl);

// Returns the runtime profile for the sink.
virtual RuntimeProfile* profile() = 0;
Expand Down
5 changes: 0 additions & 5 deletions be/src/exec/s3_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ S3Writer::S3Writer(const std::map<std::string, std::string>& properties, const s
: _properties(properties),
_path(path),
_uri(path),
_sync_needed(false),
_client(ClientFactory::instance().create(_properties)),
_temp_file(std::make_shared<Aws::Utils::TempFile>(
std::ios_base::binary | std::ios_base::trunc | std::ios_base::in |
Expand Down Expand Up @@ -80,7 +79,6 @@ Status S3Writer::write(const uint8_t* buf, size_t buf_len, size_t* written_len)
if (!_temp_file) {
return Status::BufferAllocFailed("The internal temporary file is not writable.");
}
_sync_needed = true;
_temp_file->write(reinterpret_cast<const char*>(buf), buf_len);
if (!_temp_file->good()) {
return Status::BufferAllocFailed("Could not append to the internal temporary file.");
Expand All @@ -101,9 +99,6 @@ Status S3Writer::_sync() {
if (!_temp_file) {
return Status::BufferAllocFailed("The internal temporary file is not writable.");
}
if (!_sync_needed) {
return Status::OK();
}
CHECK_S3_CLIENT(_client);
Aws::S3::Model::PutObjectRequest request;
request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
Expand Down
1 change: 0 additions & 1 deletion be/src/exec/s3_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class S3Writer : public FileWriter {
const std::map<std::string, std::string>& _properties;
std::string _path;
S3URI _uri;
bool _sync_needed;
std::shared_ptr<Aws::S3::S3Client> _client;
std::shared_ptr<Aws::Utils::TempFile> _temp_file;
};
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ set(RUNTIME_FILES
raw_value.cpp
raw_value_ir.cpp
result_sink.cpp
result_file_sink.cpp
result_buffer_mgr.cpp
result_writer.cpp
row_batch.cpp
Expand Down
182 changes: 39 additions & 143 deletions be/src/runtime/data_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@
#include "common/logging.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "gen_cpp/BackendService.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
#include "runtime/client_cache.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/data_stream_recvr.h"
Expand All @@ -50,152 +46,42 @@
#include "util/debug_util.h"
#include "util/defer_op.h"
#include "util/network_util.h"
#include "util/ref_count_closure.h"
#include "util/thrift_client.h"
#include "util/thrift_util.h"

namespace doris {

// A channel sends data asynchronously via calls to transmit_data
// to a single destination ipaddress/node.
// It has a fixed-capacity buffer and allows the caller either to add rows to
// that buffer individually (AddRow()), or circumvent the buffer altogether and send
// TRowBatches directly (SendBatch()). Either way, there can only be one in-flight RPC
// at any one time (ie, sending will block if the most recent rpc hasn't finished,
// which allows the receiver node to throttle the sender by withholding acks).
// *Not* thread-safe.
class DataStreamSender::Channel {
public:
// Create channel to send data to particular ipaddress/port/query/node
// combination. buffer_size is specified in bytes and a soft limit on
// how much tuple data is getting accumulated before being sent; it only applies
// when data is added via add_row() and not sent directly via send_batch().
Channel(DataStreamSender* parent, const RowDescriptor& row_desc,
const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain,
bool send_query_statistics_with_every_batch)
: _parent(parent),
_buffer_size(buffer_size),
_row_desc(row_desc),
_fragment_instance_id(fragment_instance_id),
_dest_node_id(dest_node_id),
_num_data_bytes_sent(0),
_packet_seq(0),
_need_close(false),
_be_number(0),
_brpc_dest_addr(brpc_dest),
_is_transfer_chain(is_transfer_chain),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
std::string localhost = BackendOptions::get_localhost();
_is_local =
_brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port;
if (_is_local) {
LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id;
}
}

virtual ~Channel() {
if (_closure != nullptr && _closure->unref()) {
delete _closure;
}
// release this before request desctruct
_brpc_request.release_finst_id();
}

// Initialize channel.
// Returns OK if successful, error indication otherwise.
Status init(RuntimeState* state);

// Copies a single row into this channel's output buffer and flushes buffer
// if it reaches capacity.
// Returns error status if any of the preceding rpcs failed, OK otherwise.
Status add_row(TupleRow* row);

// Asynchronously sends a row batch.
// Returns the status of the most recently finished transmit_data
// rpc (or OK if there wasn't one that hasn't been reported yet).
// if batch is nullptr, send the eof packet
Status send_batch(PRowBatch* batch, bool eos = false);

Status send_local_batch(bool eos);

Status send_local_batch(RowBatch* batch, bool use_move);

// Flush buffered rows and close channel. This function don't wait the response
// of close operation, client should call close_wait() to finish channel's close.
// We split one close operation into two phases in order to make multiple channels
// can run parallel.
Status close(RuntimeState* state);

// Get close wait's response, to finish channel close operation.
Status close_wait(RuntimeState* state);

int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }

PRowBatch* pb_batch() { return &_pb_batch; }

std::string get_fragment_instance_id_str() {
UniqueId uid(_fragment_instance_id);
return uid.to_string();
}

TUniqueId get_fragment_instance_id() { return _fragment_instance_id; }

bool is_local() { return _is_local; }

private:
inline Status _wait_last_brpc() {
if (_closure == nullptr) return Status::OK();
auto cntl = &_closure->cntl;
brpc::Join(cntl->call_id());
if (cntl->Failed()) {
std::stringstream ss;
ss << "failed to send brpc batch, error=" << berror(cntl->ErrorCode())
<< ", error_text=" << cntl->ErrorText()
<< ", client: " << BackendOptions::get_localhost();
LOG(WARNING) << ss.str();
return Status::ThriftRpcError(ss.str());
}
return Status::OK();
DataStreamSender::Channel::Channel(DataStreamSender* parent, const RowDescriptor& row_desc,
const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain,
bool send_query_statistics_with_every_batch)
: _parent(parent),
_buffer_size(buffer_size),
_row_desc(row_desc),
_fragment_instance_id(fragment_instance_id),
_dest_node_id(dest_node_id),
_num_data_bytes_sent(0),
_packet_seq(0),
_need_close(false),
_be_number(0),
_brpc_dest_addr(brpc_dest),
_is_transfer_chain(is_transfer_chain),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
std::string localhost = BackendOptions::get_localhost();
_is_local =
_brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port;
if (_is_local) {
LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id;
}
}

private:
// Serialize _batch into _thrift_batch and send via send_batch().
// Returns send_batch() status.
Status send_current_batch(bool eos = false);
Status close_internal();

DataStreamSender* _parent;
int _buffer_size;

const RowDescriptor& _row_desc;
TUniqueId _fragment_instance_id;
PlanNodeId _dest_node_id;

// the number of TRowBatch.data bytes sent successfully
int64_t _num_data_bytes_sent;
int64_t _packet_seq;

// we're accumulating rows into this batch
boost::scoped_ptr<RowBatch> _batch;

bool _need_close;
int _be_number;

TNetworkAddress _brpc_dest_addr;

// TODO(zc): initused for brpc
PUniqueId _finst_id;
PRowBatch _pb_batch;
PTransmitDataParams _brpc_request;
PBackendService_Stub* _brpc_stub = nullptr;
RefCountClosure<PTransmitDataResult>* _closure = nullptr;
int32_t _brpc_timeout_ms = 500;
// whether the dest can be treated as query statistics transfer chain.
bool _is_transfer_chain;
bool _send_query_statistics_with_every_batch;
bool _is_local;
};
DataStreamSender::Channel::~Channel() {
if (_closure != nullptr && _closure->unref()) {
delete _closure;
}
// release this before request desctruct
_brpc_request.release_finst_id();
}

Status DataStreamSender::Channel::init(RuntimeState* state) {
_be_number = state->be_number();
Expand Down Expand Up @@ -371,6 +257,16 @@ Status DataStreamSender::Channel::close_wait(RuntimeState* state) {
return Status::OK();
}

DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc)
: _pool(pool),
_sender_id(sender_id),
_row_desc(row_desc),
_serialize_batch_timer(NULL),
_bytes_sent_counter(NULL),
_local_bytes_send_counter(NULL),
_current_pb_batch(&_pb_batch1) {
}

DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
Expand Down
Loading