From c1f1479ba36947f9fe2fc120ebb7db8c3f2305a0 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 16 Jul 2019 10:20:51 +0800 Subject: [PATCH 1/6] Modify the result json format of mini load. Mini load is now using stream load framework. But we should keep the mini load return behavior and result json format be same as old. So PUBLISH_TIMEOUT error should be treated as OK in mini load. --- be/src/http/action/mini_load.cpp | 10 ++-- .../stream_load/stream_load_context.cpp | 59 +++++++++++++++++++ .../runtime/stream_load/stream_load_context.h | 3 + be/src/runtime/tablet_writer_mgr.cpp | 3 +- 4 files changed, 69 insertions(+), 6 deletions(-) diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index f423d63f8cb31a..097550f220dfea 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -810,12 +810,12 @@ void MiniLoadAction::_new_handle(HttpRequest* req) { } } - if (!ctx->status.ok()) { + // if failed to commit and status is not PUBLISH_TIMEOUT, rollback the txn. + // PUBLISH_TIMEOUT is treated as OK in mini load, because user will use show load stmt + // to see the final result. + if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) { if (ctx->need_rollback) { _exec_env->stream_load_executor()->rollback_txn(ctx); - if (ctx->status.code() == TStatusCode::PUBLISH_TIMEOUT) { - ctx->status = Status::PublishTimeout("transation has been rollback because it was timeout in phase of publish"); - } ctx->need_rollback = false; } if (ctx->body_sink.get() != nullptr) { @@ -823,7 +823,7 @@ void MiniLoadAction::_new_handle(HttpRequest* req) { } } - std::string str = to_json(ctx->status); + std::string str = ctx->to_json_for_mini_load(); HttpChannel::send_reply(req, str); } diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index c4ec694a00c9d7..ecae769b3458a0 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -76,6 +76,65 @@ std::string StreamLoadContext::to_json() const { return s.GetString(); } +/* + * The old mini load result format is as followes: + * (which defined in src/util/json_util.cpp) + * + * { + * "status" : "Success"("Fail"), + * "msg" : "xxxx" + * } + * + * New format is: + * + * { + * "txn_id" : 123456 + * "status" : "Success"("Fail"), + * "msg" : "xxxx" + * } + * + */ +std::string StreamLoadContext::to_json_for_mini_load() const { + rapidjson::StringBuffer s; + rapidjson::PrettyWriter writer(s); + writer.StartObject(); + + // for unification, use underscore, not camel cased + writer.Key("txn_id"); + writer.Int64(txn_id); + + // status + bool show_ok = true; + writer.Key("status"); + switch (status.code()) { + case TStatusCode::OK: + writer.String("Success"); + break; + case TStatusCode::PUBLISH_TIMEOUT: + // treat PUBLISH_TIMEOUT as OK in mini load + writer.String("Success"); + break; + case TStatusCode::LABEL_ALREADY_EXISTS: + writer.String("Label Already Exists"); + show_ok = false; + break; + default: + writer.String("Fail"); + show_ok = false; + break; + } + // msg + writer.Key("msg"); + if (status.ok() || show_ok) { + writer.String("OK"); + } else { + writer.String(status.get_error_msg().c_str()); + } + writer.EndObject(); + return s.GetString(); +} + + std::string StreamLoadContext::brief(bool detail) const { std::stringstream ss; ss << "id=" << id << ", job id=" << job_id << ", txn id=" << txn_id << ", label=" << label; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index c419290ca37c9d..f2c5c519a497ee 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -101,6 +101,9 @@ class StreamLoadContext { } std::string to_json() const; + // the old mini load result format is not same as stream load. + // add this function for compatible with old mini load result format. + std::string to_json_for_mini_load() const; // return the brief info of this context. // also print the load source info if detail is set to true diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index d272afdac69986..0e397cc6db9282 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -104,6 +104,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& params) { // Normal case, already open by other sender return Status::OK(); } + LOG(INFO) << "open tablets channel: " << _key; _txn_id = params.txn_id(); _index_id = params.index_id(); _schema = new OlapTableSchemaParam(); @@ -170,6 +171,7 @@ Status TabletsChannel::close(int sender_id, bool* finished, *finished = (_num_remaining_senders == 0); return _close_status; } + LOG(INFO) << "close tablets channel: " << _key; for (auto pid : partition_ids) { _partition_ids.emplace(pid); } @@ -251,7 +253,6 @@ TabletWriterMgr::~TabletWriterMgr() { Status TabletWriterMgr::open(const PTabletWriterOpenRequest& params) { TabletsChannelKey key(params.id(), params.index_id()); - LOG(INFO) << "open tablets writer channel: " << key; std::shared_ptr channel; { std::lock_guard l(_lock); From 24b504dd5798198a3bf9bfa9e795763eb0a6c119 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 16 Jul 2019 11:21:28 +0800 Subject: [PATCH 2/6] remove txn id in mini load result --- be/src/runtime/stream_load/stream_load_context.cpp | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index ecae769b3458a0..813a63906726ee 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -85,24 +85,12 @@ std::string StreamLoadContext::to_json() const { * "msg" : "xxxx" * } * - * New format is: - * - * { - * "txn_id" : 123456 - * "status" : "Success"("Fail"), - * "msg" : "xxxx" - * } - * */ std::string StreamLoadContext::to_json_for_mini_load() const { rapidjson::StringBuffer s; rapidjson::PrettyWriter writer(s); writer.StartObject(); - // for unification, use underscore, not camel cased - writer.Key("txn_id"); - writer.Int64(txn_id); - // status bool show_ok = true; writer.Key("status"); From 2bcbdd5d16425256d2fc2d86b7d6c5a4d344007b Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 16 Jul 2019 13:33:23 +0800 Subject: [PATCH 3/6] add wait_in_flight_packet_timer --- be/src/exec/tablet_sink.cpp | 8 ++++++++ be/src/exec/tablet_sink.h | 7 +++++++ 2 files changed, 15 insertions(+) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 9369c7370963f9..f7e637ee43dbfe 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -196,6 +196,8 @@ Status NodeChannel::_wait_in_flight_packet() { if (!_has_in_flight_packet) { return Status::OK(); } + + SCOPED_RAW_TIMER(_parent->wait_in_flight_packet_ns()); _add_batch_closure->join(); _has_in_flight_packet = false; if (_add_batch_closure->cntl.Failed()) { @@ -214,6 +216,7 @@ Status NodeChannel::_send_cur_batch(bool eos) { _add_batch_request.set_eos(eos); _add_batch_request.set_packet_seq(_next_packet_seq); if (_batch->num_rows() > 0) { + SCOPED_RAW_TIMER(_parent->serialize_batch_ns()); _batch->serialize(_add_batch_request.mutable_row_batch()); } @@ -498,6 +501,8 @@ Status OlapTableSink::prepare(RuntimeState* state) { _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); _open_timer = ADD_TIMER(_profile, "OpenTime"); _close_timer = ADD_TIMER(_profile, "CloseTime"); + _wait_in_flight_packet_timer = ADD_TIMER(_profile, "WaitInFlighPacketTime"); + _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); // open all channels auto& partitions = _partition->get_partitions(); @@ -602,6 +607,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_send_data_timer, _send_data_ns); COUNTER_SET(_convert_batch_timer, _convert_batch_ns); COUNTER_SET(_validate_data_timer, _validate_data_ns); + COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns); + COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns); + state->update_num_rows_load_filtered(_number_filtered_rows); } else { for (auto channel : _channels) { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 3bd5941f3ce915..49a3e87e0bd43e 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -168,6 +168,9 @@ class OlapTableSink : public DataSink { return _profile; } + int64_t* wait_in_flight_packet_ns() { return &_wait_in_flight_packet_ns; } + int64_t* serialize_batch_ns() { return &_serialize_batch_ns; } + private: // convert input batch to output batch which will be loaded into OLAP table. // this is only used in insert statement. @@ -236,6 +239,8 @@ class OlapTableSink : public DataSink { int64_t _convert_batch_ns = 0; int64_t _validate_data_ns = 0; int64_t _send_data_ns = 0; + int64_t _wait_in_flight_packet_ns = 0; + int64_t _serialize_batch_ns = 0; int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; int64_t _number_filtered_rows = 0; @@ -248,6 +253,8 @@ class OlapTableSink : public DataSink { RuntimeProfile::Counter* _validate_data_timer = nullptr; RuntimeProfile::Counter* _open_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; + RuntimeProfile::Counter* _wait_in_flight_packet_timer = nullptr; + RuntimeProfile::Counter* _serialize_batch_timer = nullptr; }; } From 40a736d0b1452c71b2ad1d3dfcbbab013f6b7c8c Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 16 Jul 2019 14:54:21 +0800 Subject: [PATCH 4/6] fix misspell --- be/src/exec/tablet_sink.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index f7e637ee43dbfe..2c676eb066912a 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -501,7 +501,7 @@ Status OlapTableSink::prepare(RuntimeState* state) { _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime"); _open_timer = ADD_TIMER(_profile, "OpenTime"); _close_timer = ADD_TIMER(_profile, "CloseTime"); - _wait_in_flight_packet_timer = ADD_TIMER(_profile, "WaitInFlighPacketTime"); + _wait_in_flight_packet_timer = ADD_TIMER(_profile, "WaitInFlightPacketTime"); _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime"); // open all channels From a655178ba5b06c7f9fd40eff6dff4f332d74d853 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 16 Jul 2019 16:32:12 +0800 Subject: [PATCH 5/6] remove label_alread_used case in mini load resutl to_json() method --- be/src/runtime/stream_load/stream_load_context.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index 813a63906726ee..27981526b6f953 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -102,10 +102,6 @@ std::string StreamLoadContext::to_json_for_mini_load() const { // treat PUBLISH_TIMEOUT as OK in mini load writer.String("Success"); break; - case TStatusCode::LABEL_ALREADY_EXISTS: - writer.String("Label Already Exists"); - show_ok = false; - break; default: writer.String("Fail"); show_ok = false; From 55baf807876f255bae83c5024f091ba80ac525eb Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 16 Jul 2019 16:45:35 +0800 Subject: [PATCH 6/6] change counter name to mutable --- be/src/exec/tablet_sink.cpp | 4 ++-- be/src/exec/tablet_sink.h | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 2c676eb066912a..3be046a1c8069d 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -197,7 +197,7 @@ Status NodeChannel::_wait_in_flight_packet() { return Status::OK(); } - SCOPED_RAW_TIMER(_parent->wait_in_flight_packet_ns()); + SCOPED_RAW_TIMER(_parent->mutable_wait_in_flight_packet_ns()); _add_batch_closure->join(); _has_in_flight_packet = false; if (_add_batch_closure->cntl.Failed()) { @@ -216,7 +216,7 @@ Status NodeChannel::_send_cur_batch(bool eos) { _add_batch_request.set_eos(eos); _add_batch_request.set_packet_seq(_next_packet_seq); if (_batch->num_rows() > 0) { - SCOPED_RAW_TIMER(_parent->serialize_batch_ns()); + SCOPED_RAW_TIMER(_parent->mutable_serialize_batch_ns()); _batch->serialize(_add_batch_request.mutable_row_batch()); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 49a3e87e0bd43e..dc7dc370412f61 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -168,8 +168,10 @@ class OlapTableSink : public DataSink { return _profile; } - int64_t* wait_in_flight_packet_ns() { return &_wait_in_flight_packet_ns; } - int64_t* serialize_batch_ns() { return &_serialize_batch_ns; } + // these 2 counters does not thread-safe. make sure only one thread + // at a time can modify them. + int64_t* mutable_wait_in_flight_packet_ns() { return &_wait_in_flight_packet_ns; } + int64_t* mutable_serialize_batch_ns() { return &_serialize_batch_ns; } private: // convert input batch to output batch which will be loaded into OLAP table.