Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ Status NodeChannel::_wait_in_flight_packet() {
if (!_has_in_flight_packet) {
return Status::OK();
}

SCOPED_RAW_TIMER(_parent->mutable_wait_in_flight_packet_ns());
_add_batch_closure->join();
_has_in_flight_packet = false;
if (_add_batch_closure->cntl.Failed()) {
Expand All @@ -214,6 +216,7 @@ Status NodeChannel::_send_cur_batch(bool eos) {
_add_batch_request.set_eos(eos);
_add_batch_request.set_packet_seq(_next_packet_seq);
if (_batch->num_rows() > 0) {
SCOPED_RAW_TIMER(_parent->mutable_serialize_batch_ns());
_batch->serialize(_add_batch_request.mutable_row_batch());
}

Expand Down Expand Up @@ -498,6 +501,8 @@ Status OlapTableSink::prepare(RuntimeState* state) {
_validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime");
_open_timer = ADD_TIMER(_profile, "OpenTime");
_close_timer = ADD_TIMER(_profile, "CloseTime");
_wait_in_flight_packet_timer = ADD_TIMER(_profile, "WaitInFlightPacketTime");
_serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime");

// open all channels
auto& partitions = _partition->get_partitions();
Expand Down Expand Up @@ -602,6 +607,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
COUNTER_SET(_send_data_timer, _send_data_ns);
COUNTER_SET(_convert_batch_timer, _convert_batch_ns);
COUNTER_SET(_validate_data_timer, _validate_data_ns);
COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns);
COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns);

state->update_num_rows_load_filtered(_number_filtered_rows);
} else {
for (auto channel : _channels) {
Expand Down
9 changes: 9 additions & 0 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ class OlapTableSink : public DataSink {
return _profile;
}

// these 2 counters does not thread-safe. make sure only one thread
// at a time can modify them.
int64_t* mutable_wait_in_flight_packet_ns() { return &_wait_in_flight_packet_ns; }
int64_t* mutable_serialize_batch_ns() { return &_serialize_batch_ns; }

private:
// convert input batch to output batch which will be loaded into OLAP table.
// this is only used in insert statement.
Expand Down Expand Up @@ -236,6 +241,8 @@ class OlapTableSink : public DataSink {
int64_t _convert_batch_ns = 0;
int64_t _validate_data_ns = 0;
int64_t _send_data_ns = 0;
int64_t _wait_in_flight_packet_ns = 0;
int64_t _serialize_batch_ns = 0;
int64_t _number_input_rows = 0;
int64_t _number_output_rows = 0;
int64_t _number_filtered_rows = 0;
Expand All @@ -248,6 +255,8 @@ class OlapTableSink : public DataSink {
RuntimeProfile::Counter* _validate_data_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _wait_in_flight_packet_timer = nullptr;
RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
};

}
Expand Down
10 changes: 5 additions & 5 deletions be/src/http/action/mini_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,20 +810,20 @@ void MiniLoadAction::_new_handle(HttpRequest* req) {
}
}

if (!ctx->status.ok()) {
// if failed to commit and status is not PUBLISH_TIMEOUT, rollback the txn.
// PUBLISH_TIMEOUT is treated as OK in mini load, because user will use show load stmt
// to see the final result.
if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) {
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx);
if (ctx->status.code() == TStatusCode::PUBLISH_TIMEOUT) {
ctx->status = Status::PublishTimeout("transation has been rollback because it was timeout in phase of publish");
}
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
ctx->body_sink->cancel();
}
}

std::string str = to_json(ctx->status);
std::string str = ctx->to_json_for_mini_load();
HttpChannel::send_reply(req, str);
}

Expand Down
43 changes: 43 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,49 @@ std::string StreamLoadContext::to_json() const {
return s.GetString();
}

/*
* The old mini load result format is as followes:
* (which defined in src/util/json_util.cpp)
*
* {
* "status" : "Success"("Fail"),
* "msg" : "xxxx"
* }
*
*/
std::string StreamLoadContext::to_json_for_mini_load() const {
rapidjson::StringBuffer s;
rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
writer.StartObject();

// status
bool show_ok = true;
writer.Key("status");
switch (status.code()) {
case TStatusCode::OK:
writer.String("Success");
break;
case TStatusCode::PUBLISH_TIMEOUT:
// treat PUBLISH_TIMEOUT as OK in mini load
writer.String("Success");
break;
default:
writer.String("Fail");
show_ok = false;
break;
}
// msg
writer.Key("msg");
if (status.ok() || show_ok) {
writer.String("OK");
} else {
writer.String(status.get_error_msg().c_str());
}
writer.EndObject();
return s.GetString();
}


std::string StreamLoadContext::brief(bool detail) const {
std::stringstream ss;
ss << "id=" << id << ", job id=" << job_id << ", txn id=" << txn_id << ", label=" << label;
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class StreamLoadContext {
}

std::string to_json() const;
// the old mini load result format is not same as stream load.
// add this function for compatible with old mini load result format.
std::string to_json_for_mini_load() const;

// return the brief info of this context.
// also print the load source info if detail is set to true
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/tablet_writer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) {
// Normal case, already open by other sender
return Status::OK();
}
LOG(INFO) << "open tablets channel: " << _key;
_txn_id = params.txn_id();
_index_id = params.index_id();
_schema = new OlapTableSchemaParam();
Expand Down Expand Up @@ -170,6 +171,7 @@ Status TabletsChannel::close(int sender_id, bool* finished,
*finished = (_num_remaining_senders == 0);
return _close_status;
}
LOG(INFO) << "close tablets channel: " << _key;
for (auto pid : partition_ids) {
_partition_ids.emplace(pid);
}
Expand Down Expand Up @@ -251,7 +253,6 @@ TabletWriterMgr::~TabletWriterMgr() {

Status TabletWriterMgr::open(const PTabletWriterOpenRequest& params) {
TabletsChannelKey key(params.id(), params.index_id());
LOG(INFO) << "open tablets writer channel: " << key;
std::shared_ptr<TabletsChannel> channel;
{
std::lock_guard<std::mutex> l(_lock);
Expand Down