From db90a5e59a60c3c81c8a3d7d460a03c1a1a2e675 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Mon, 15 Jul 2019 17:02:56 +0800 Subject: [PATCH 1/4] Add timeout in stream load planner Mini load timeout needs to be added in plan options. The timeout property has been added in request of process put. Otherwise, the timeout of mini load is useless. --- be/src/http/action/mini_load.cpp | 3 +++ be/src/runtime/stream_load/stream_load_context.h | 2 +- fe/src/main/java/org/apache/doris/common/Config.java | 5 +++-- .../java/org/apache/doris/planner/StreamLoadPlanner.java | 2 +- .../main/java/org/apache/doris/task/StreamLoadTask.java | 9 +++++++++ gensrc/thrift/FrontendService.thrift | 1 + 6 files changed, 18 insertions(+), 4 deletions(-) diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index 15cd300839dae2..868d9a81c892a0 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -709,6 +709,9 @@ Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) { if (column_separator_it != params.end()) { put_request.__set_columnSeparator(column_separator_it->second); } + if (ctx->timeout_second != -1) { + put_request.__set_timeout(ctx->timeout_second); + } // plan this load TNetworkAddress master_addr = _exec_env->master_info()->network_address; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 8395e9d6dc7e07..c419290ca37c9d 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -129,7 +129,7 @@ class StreamLoadContext { // optional std::string sub_label; double max_filter_ratio = 0.0; - int64_t timeout_second = -1; + int32_t timeout_second = -1; AuthInfo auth; // the following members control the max progress of a consuming diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 10459be41d506d..c2356cdd362a97 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -349,8 +349,9 @@ public class Config extends ConfigBase { public static int pull_load_task_default_timeout_second = 14400; // 4 hour /* - * Default mini load timeout + * Default non-streaming mini load timeout */ + @Deprecated @ConfField(mutable = true, masterOnly = true) public static int mini_load_default_timeout_second = 3600; // 1 hour @@ -361,7 +362,7 @@ public class Config extends ConfigBase { public static int insert_load_default_timeout_second = 3600; // 1 hour /* - * Default stream load timeout + * Default stream load and streaming mini load timeout */ @ConfField(mutable = true, masterOnly = true) public static int stream_load_default_timeout_second = 300; // 300s diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 6ab185624ada9c..5a8f417db4a46f 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -141,7 +141,7 @@ public TExecPlanFragmentParams plan() throws UserException { params.setParams(execParams); TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setQuery_type(TQueryType.LOAD); - queryOptions.setQuery_timeout(Config.stream_load_default_timeout_second); + queryOptions.setQuery_timeout(streamLoadTask.getTimeout()); params.setQuery_options(queryOptions); TQueryGlobals queryGlobals = new TQueryGlobals(); queryGlobals.setNow_string(DATE_FORMAT.format(new Date())); diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java index 56013589b1bfb7..57059fec8e4f51 100644 --- a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.thrift.TFileFormatType; @@ -56,6 +57,7 @@ public class StreamLoadTask { private String partitions; private String path; private boolean negative; + private int timeout = Config.stream_load_default_timeout_second; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) { this.id = id; @@ -104,6 +106,10 @@ public boolean getNegative() { return negative; } + public int getTimeout() { + return timeout; + } + public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException { StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), request.getFileType(), request.getFormatType()); @@ -134,6 +140,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws if (request.isSetNegative()) { negative = request.isNegative(); } + if (request.isSetTimeout()) { + timeout = request.getTimeout(); + } } public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 4a489560579746..da786af3034ba0 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -521,6 +521,7 @@ struct TStreamLoadPutRequest { 15: optional string partitions 16: optional i64 auth_code 17: optional bool negative + 18: optional i32 timeout } struct TStreamLoadPutResult { From 66317f40fd67085e804bdab5f129cd8489fa768b Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Mon, 15 Jul 2019 17:17:21 +0800 Subject: [PATCH 2/4] Fix bug --- .../java/org/apache/doris/planner/StreamLoadPlanner.java | 5 ++--- gensrc/thrift/FrontendService.thrift | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 5a8f417db4a46f..68476b02f828bd 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -17,17 +17,16 @@ package org.apache.doris.planner; -import org.apache.doris.catalog.AggregateType; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.task.StreamLoadTask; diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index da786af3034ba0..8e59d191d85e23 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -483,6 +483,7 @@ struct TLoadTxnBeginRequest { 7: required string label 8: optional i64 timestamp 9: optional i64 auth_code + // The real value of timeout should be i32. i64 ensures the compatibility of interface. 10: optional i64 timeout } From b5f80a4819766d19f96ebb16b6f3906ac8293db2 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Mon, 15 Jul 2019 18:54:59 +0800 Subject: [PATCH 3/4] Fix bug --- .../java/org/apache/doris/load/loadv2/MiniLoadJob.java | 4 ++-- .../apache/doris/transaction/GlobalTransactionMgr.java | 10 ++++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index 18bbc0dd5b47d2..0c2ce00c0b577e 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -55,7 +55,7 @@ public MiniLoadJob(long dbId, TMiniLoadBeginRequest request) throws MetaNotFound if (request.isSetTimeout_second()) { this.timeoutSecond = request.getTimeout_second(); } else { - this.timeoutSecond = Config.mini_load_default_timeout_second; + this.timeoutSecond = Config.stream_load_default_timeout_second; } if (request.isSetMax_filter_ratio()) { this.maxFilterRatio = request.getMax_filter_ratio(); @@ -90,7 +90,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti transactionId = Catalog.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, label, -1, "FE: " + FrontendOptions.getLocalHostAddress(), TransactionState.LoadJobSourceType.BACKEND_STREAMING, id, - timeoutSecond - 1); + timeoutSecond); } @Override diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index f64a01344fedbd..b202d2d431142a 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -359,6 +359,9 @@ public void commitTransaction(long dbId, long transactionId, List tabletBackends = tablet.getBackendIds(); totalInvolvedBackends.addAll(tabletBackends); Set commitBackends = tabletToBackends.get(tabletId); + // save the error replica ids for current tablet + // this param is used for log + Set errorBackendIdsForTablet = Sets.newHashSet(); for (long tabletBackend : tabletBackends) { Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend); if (replica == null) { @@ -386,6 +389,7 @@ public void commitTransaction(long dbId, long transactionId, List Date: Mon, 15 Jul 2019 20:34:33 +0800 Subject: [PATCH 4/4] Add log of label, txn and query id in mini load --- be/src/http/action/mini_load.cpp | 1 + be/src/runtime/stream_load/stream_load_executor.cpp | 3 +++ be/src/runtime/tablet_writer_mgr.cpp | 1 + fe/src/main/java/org/apache/doris/common/Config.java | 2 +- 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index 868d9a81c892a0..39cea2f8544d60 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -664,6 +664,7 @@ Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) { ctx->txn_id = res.txn_id; // txn has been begun in fe ctx->need_rollback = true; + LOG(INFO) << "load:" << ctx->label << " txn:" << res.txn_id << " has been begun in fe"; return Status::OK(); } diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index ef05363395ca13..dddee816487a02 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -47,6 +47,9 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { // submit this params #ifndef BE_TEST ctx->ref(); + LOG(INFO) << "begin to execute job:" << ctx->label + << " with txn id:" << ctx->txn_id + << " with query id:" << print_id(ctx->put_result.params.params.query_id); auto st = _exec_env->fragment_mgr()->exec_plan_fragment( ctx->put_result.params, [ctx] (PlanFragmentExecutor* executor) { diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index 498a61abdc21cf..9c3b65dae88532 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -251,6 +251,7 @@ TabletWriterMgr::~TabletWriterMgr() { Status TabletWriterMgr::open(const PTabletWriterOpenRequest& params) { TabletsChannelKey key(params.id(), params.index_id()); + LOG(INFO) << "open tablets writer channel: " << key; std::shared_ptr channel; { std::lock_guard l(_lock); diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index c2356cdd362a97..6239686272443d 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -365,7 +365,7 @@ public class Config extends ConfigBase { * Default stream load and streaming mini load timeout */ @ConfField(mutable = true, masterOnly = true) - public static int stream_load_default_timeout_second = 300; // 300s + public static int stream_load_default_timeout_second = 600; // 300s /* * Max stream load timeout