diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index 15cd300839dae2..39cea2f8544d60 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -664,6 +664,7 @@ Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) { ctx->txn_id = res.txn_id; // txn has been begun in fe ctx->need_rollback = true; + LOG(INFO) << "load:" << ctx->label << " txn:" << res.txn_id << " has been begun in fe"; return Status::OK(); } @@ -709,6 +710,9 @@ Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) { if (column_separator_it != params.end()) { put_request.__set_columnSeparator(column_separator_it->second); } + if (ctx->timeout_second != -1) { + put_request.__set_timeout(ctx->timeout_second); + } // plan this load TNetworkAddress master_addr = _exec_env->master_info()->network_address; diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 8395e9d6dc7e07..c419290ca37c9d 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -129,7 +129,7 @@ class StreamLoadContext { // optional std::string sub_label; double max_filter_ratio = 0.0; - int64_t timeout_second = -1; + int32_t timeout_second = -1; AuthInfo auth; // the following members control the max progress of a consuming diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index ef05363395ca13..dddee816487a02 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -47,6 +47,9 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { // submit this params #ifndef BE_TEST ctx->ref(); + LOG(INFO) << "begin to execute job:" << ctx->label + << " with txn id:" << ctx->txn_id + << " with query id:" << print_id(ctx->put_result.params.params.query_id); auto st = _exec_env->fragment_mgr()->exec_plan_fragment( ctx->put_result.params, [ctx] (PlanFragmentExecutor* executor) { diff --git a/be/src/runtime/tablet_writer_mgr.cpp b/be/src/runtime/tablet_writer_mgr.cpp index 498a61abdc21cf..9c3b65dae88532 100644 --- a/be/src/runtime/tablet_writer_mgr.cpp +++ b/be/src/runtime/tablet_writer_mgr.cpp @@ -251,6 +251,7 @@ TabletWriterMgr::~TabletWriterMgr() { Status TabletWriterMgr::open(const PTabletWriterOpenRequest& params) { TabletsChannelKey key(params.id(), params.index_id()); + LOG(INFO) << "open tablets writer channel: " << key; std::shared_ptr channel; { std::lock_guard l(_lock); diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 10459be41d506d..6239686272443d 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -349,8 +349,9 @@ public class Config extends ConfigBase { public static int pull_load_task_default_timeout_second = 14400; // 4 hour /* - * Default mini load timeout + * Default non-streaming mini load timeout */ + @Deprecated @ConfField(mutable = true, masterOnly = true) public static int mini_load_default_timeout_second = 3600; // 1 hour @@ -361,10 +362,10 @@ public class Config extends ConfigBase { public static int insert_load_default_timeout_second = 3600; // 1 hour /* - * Default stream load timeout + * Default stream load and streaming mini load timeout */ @ConfField(mutable = true, masterOnly = true) - public static int stream_load_default_timeout_second = 300; // 300s + public static int stream_load_default_timeout_second = 600; // 300s /* * Max stream load timeout diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index 18bbc0dd5b47d2..0c2ce00c0b577e 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -55,7 +55,7 @@ public MiniLoadJob(long dbId, TMiniLoadBeginRequest request) throws MetaNotFound if (request.isSetTimeout_second()) { this.timeoutSecond = request.getTimeout_second(); } else { - this.timeoutSecond = Config.mini_load_default_timeout_second; + this.timeoutSecond = Config.stream_load_default_timeout_second; } if (request.isSetMax_filter_ratio()) { this.maxFilterRatio = request.getMax_filter_ratio(); @@ -90,7 +90,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti transactionId = Catalog.getCurrentGlobalTransactionMgr() .beginTransaction(dbId, label, -1, "FE: " + FrontendOptions.getLocalHostAddress(), TransactionState.LoadJobSourceType.BACKEND_STREAMING, id, - timeoutSecond - 1); + timeoutSecond); } @Override diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 6ab185624ada9c..68476b02f828bd 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -17,17 +17,16 @@ package org.apache.doris.planner; -import org.apache.doris.catalog.AggregateType; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.load.LoadErrorHub; import org.apache.doris.task.StreamLoadTask; @@ -141,7 +140,7 @@ public TExecPlanFragmentParams plan() throws UserException { params.setParams(execParams); TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setQuery_type(TQueryType.LOAD); - queryOptions.setQuery_timeout(Config.stream_load_default_timeout_second); + queryOptions.setQuery_timeout(streamLoadTask.getTimeout()); params.setQuery_options(queryOptions); TQueryGlobals queryGlobals = new TQueryGlobals(); queryGlobals.setNow_string(DATE_FORMAT.format(new Date())); diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java index 56013589b1bfb7..57059fec8e4f51 100644 --- a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.thrift.TFileFormatType; @@ -56,6 +57,7 @@ public class StreamLoadTask { private String partitions; private String path; private boolean negative; + private int timeout = Config.stream_load_default_timeout_second; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) { this.id = id; @@ -104,6 +106,10 @@ public boolean getNegative() { return negative; } + public int getTimeout() { + return timeout; + } + public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException { StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), request.getFileType(), request.getFormatType()); @@ -134,6 +140,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws if (request.isSetNegative()) { negative = request.isNegative(); } + if (request.isSetTimeout()) { + timeout = request.getTimeout(); + } } public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) { diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index f64a01344fedbd..b202d2d431142a 100644 --- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -359,6 +359,9 @@ public void commitTransaction(long dbId, long transactionId, List tabletBackends = tablet.getBackendIds(); totalInvolvedBackends.addAll(tabletBackends); Set commitBackends = tabletToBackends.get(tabletId); + // save the error replica ids for current tablet + // this param is used for log + Set errorBackendIdsForTablet = Sets.newHashSet(); for (long tabletBackend : tabletBackends) { Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend); if (replica == null) { @@ -386,6 +389,7 @@ public void commitTransaction(long dbId, long transactionId, List