diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 88c19348e92111..5aec75b1c158d8 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -149,6 +149,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { // copy one std::map cmt_offset = ctx->kafka_info->cmt_offset; + MonotonicStopWatch consumer_watch; MonotonicStopWatch watch; watch.start(); Status st; @@ -166,10 +167,11 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { } if (left_time <= 0 || left_rows <= 0 || left_bytes <=0) { - VLOG(3) << "kafka consume batch done" + LOG(INFO) << "kafka consume batch done" << ". left time=" << left_time << ", left rows=" << left_rows - << ", left bytes=" << left_bytes; + << ", left bytes=" << left_bytes + << ", consumer time(ms)=" << consumer_watch.elapsed_time() / 1000 / 1000; if (left_bytes == ctx->max_batch_size) { // nothing to be consumed, cancel it @@ -182,16 +184,19 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { DCHECK(left_rows < ctx->max_batch_rows); kakfa_pipe->finish(); ctx->kafka_info->cmt_offset = std::move(cmt_offset); + ctx->receive_bytes = ctx->max_batch_size - left_bytes; _finished = true; return Status::OK; } } // consume 1 message at a time + consumer_watch.start(); RdKafka::Message *msg = _k_consumer->consume(1000 /* timeout, ms */); + consumer_watch.stop(); switch (msg->err()) { case RdKafka::ERR_NO_ERROR: - LOG(INFO) << "get kafka message" + VLOG(3) << "get kafka message" << ", partition: " << msg->partition() << ", offset: " << msg->offset() << ", len: " << msg->len(); diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index ceab4cee7e1eda..39a78608ddc596 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -1195,7 +1195,7 @@ load_property ::= {: RESULT = columnsInfo; :} - | where_clause_without_null:wherePredicate + | import_where_stmt:wherePredicate {: RESULT = wherePredicate; :} diff --git a/fe/src/main/java/org/apache/doris/analysis/AlterLoadErrorUrlClause.java b/fe/src/main/java/org/apache/doris/analysis/AlterLoadErrorUrlClause.java index d88e65bdd99f71..633ed85e4290f8 100644 --- a/fe/src/main/java/org/apache/doris/analysis/AlterLoadErrorUrlClause.java +++ b/fe/src/main/java/org/apache/doris/analysis/AlterLoadErrorUrlClause.java @@ -29,8 +29,7 @@ import java.util.Map; // FORMAT: -// ALTER SYSTEM SET GLOBAL LOAD_ERROR_URL= "mysql://user:password@host:port[/database[/table]]" -// ALTER SYSTEM SET GLOBAL LOAD_ERROR_URL= "broker://" +// ALTER SYSTEM SET LOAD ERRORS HUB properties("type" = "xxx"); public class AlterLoadErrorUrlClause extends AlterClause { private static final Logger LOG = LogManager.getLogger(AlterLoadErrorUrlClause.class); diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index ae81f0a44792d4..b8c9e0a32193e0 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -226,7 +226,7 @@ public void checkLoadProperties(Analyzer analyzer) throws UserException { ColumnSeparator columnSeparator = null; ImportColumnsStmt importColumnsStmt = null; ImportWhereStmt importWhereStmt = null; - PartitionNames partitionNames = null; + List partitionNames = null; for (ParseNode parseNode : loadPropertyList) { if (parseNode instanceof ColumnSeparator) { // check column separator @@ -252,12 +252,13 @@ public void checkLoadProperties(Analyzer analyzer) throws UserException { if (partitionNames != null) { throw new AnalysisException("repeat setting of partition names"); } - partitionNames = (PartitionNames) parseNode; - partitionNames.analyze(null); + PartitionNames partitionNamesNode = (PartitionNames) parseNode; + partitionNamesNode.analyze(null); + partitionNames = partitionNamesNode.getPartitionNames(); } } routineLoadDesc = new RoutineLoadDesc(columnSeparator, importColumnsStmt, importWhereStmt, - partitionNames.getPartitionNames()); + partitionNames); } private void checkJobProperties() throws AnalysisException { diff --git a/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java b/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java index 38a91fce551102..547729f8e53bb3 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java +++ b/fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java @@ -21,20 +21,20 @@ * Created by zhaochun on 2018/4/23. */ public class ImportColumnDesc { - private String column; + private String columnName; private Expr expr; public ImportColumnDesc(String column) { - this.column = column; + this.columnName = column; } public ImportColumnDesc(String column, Expr expr) { - this.column = column; + this.columnName = column; this.expr = expr; } - public String getColumn() { - return column; + public String getColumnName() { + return columnName; } public Expr getExpr() { @@ -44,4 +44,14 @@ public Expr getExpr() { public boolean isColumn() { return expr == null; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(columnName); + if (expr != null) { + sb.append(" = ").append(expr.toSql()); + } + return sb.toString(); + } } diff --git a/fe/src/main/java/org/apache/doris/catalog/Partition.java b/fe/src/main/java/org/apache/doris/catalog/Partition.java index 942e2bf3202255..791cfdd36ae915 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/src/main/java/org/apache/doris/catalog/Partition.java @@ -239,6 +239,10 @@ public long getDataSize() { return dataSize; } + public boolean hasData() { + return !(visibleVersion == PARTITION_INIT_VERSION && visibleVersionHash == PARTITION_INIT_VERSION_HASH); + } + public static Partition read(DataInput in) throws IOException { Partition partition = new Partition(); partition.readFields(in); diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 47716b31cebdc1..e118d6c4c5bbf7 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -17,8 +17,6 @@ package org.apache.doris.load.routineload; -import com.google.common.collect.Maps; -import com.google.gson.Gson; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -41,6 +39,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.Gson; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index ef167cfc05da0b..fc298183f25307 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -17,8 +17,6 @@ package org.apache.doris.load.routineload; -import com.google.common.base.Joiner; -import com.google.gson.Gson; import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.Expr; @@ -55,9 +53,11 @@ import org.apache.doris.transaction.TxnStateChangeListener; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.gson.Gson; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -136,7 +136,7 @@ public boolean isFinalState() { protected long authCode; // protected RoutineLoadDesc routineLoadDesc; // optional protected List partitions; // optional - protected Map columnToColumnExpr; // optional + protected List columnDescs; // optional protected Expr whereExpr; // optional protected ColumnSeparator columnSeparator; // optional protected int desireTaskConcurrentNum; // optional @@ -240,9 +240,9 @@ private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { if (routineLoadDesc.getColumnsInfo() != null) { ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo(); if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) { - columnToColumnExpr = Maps.newHashMap(); + columnDescs = Lists.newArrayList(); for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) { - columnToColumnExpr.put(columnDesc.getColumn(), columnDesc.getExpr()); + columnDescs.add(columnDesc); } } } @@ -337,8 +337,8 @@ public List getPartitions() { return partitions; } - public Map getColumnToColumnExpr() { - return columnToColumnExpr; + public List getColumnDescs() { + return columnDescs; } public Expr getWhereExpr() { @@ -488,8 +488,11 @@ private void updateNumOfData(long numOfErrorRows, long numOfTotalRows, boolean i .add("max_error_num", maxErrorNum) .add("msg", "current error rows is more then max error num, begin to pause job") .build()); - // remove all of task in jobs and change job state to paused - updateState(JobState.PAUSED, "current error rows of job is more then max error num", isReplay); + // if this is a replay thread, the update state should already be replayed by OP_CHANGE_ROUTINE_LOAD_JOB + if (!isReplay) { + // remove all of task in jobs and change job state to paused + updateState(JobState.PAUSED, "current error rows of job is more then max error num", isReplay); + } } LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) @@ -508,8 +511,10 @@ private void updateNumOfData(long numOfErrorRows, long numOfTotalRows, boolean i .add("max_error_num", maxErrorNum) .add("msg", "current error rows is more then max error rows, begin to pause job") .build()); - // remove all of task in jobs and change job state to paused - updateState(JobState.PAUSED, "current error rows is more then max error num", isReplay); + if (!isReplay) { + // remove all of task in jobs and change job state to paused + updateState(JobState.PAUSED, "current error rows is more then max error num", isReplay); + } // reset currentTotalNum and currentErrorNum currentErrorRows = 0; currentTotalRows = 0; @@ -624,6 +629,7 @@ public ListenResult onCommitted(TransactionState txnState) throws TransactionExc @Override public void replayOnCommitted(TransactionState txnState) { + Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), txnState); replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment()); LOG.debug("replay on committed: {}", txnState); } @@ -685,8 +691,11 @@ public ListenResult onAborted(TransactionState txnState, String txnStatusChangeR @Override public void replayOnAborted(TransactionState txnState) { - replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment()); - LOG.debug("replay on aborted: {}", txnState); + // attachment may be null if this task is aborted by FE + if (txnState.getTxnCommitAttachment() != null) { + replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment()); + } + LOG.debug("replay on aborted: {}, has attachment: {}", txnState, txnState.getTxnCommitAttachment() == null); } // check task exists or not before call method @@ -931,8 +940,7 @@ public List> getTasksShowInfo() { private String jobPropertiesToJsonString() { Map jobProperties = Maps.newHashMap(); jobProperties.put("partitions", partitions == null ? STAR_STRING : Joiner.on(",").join(partitions)); - jobProperties.put("columnToColumnExpr", columnToColumnExpr == null ? - STAR_STRING : Joiner.on(",").withKeyValueSeparator(":").join(columnToColumnExpr)); + jobProperties.put("columnToColumnExpr", columnDescs == null ? STAR_STRING : Joiner.on(",").join(columnDescs)); jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING : whereExpr.toString()); jobProperties.put("columnSeparator", columnSeparator == null ? "\t" : columnSeparator.toString()); jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum)); diff --git a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java index a88f02fab7ca95..726c04a0bbd39d 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -74,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Full scan of an Olap table. @@ -508,8 +509,14 @@ private void getScanRangeLocations(Analyzer analyzer) throws UserException, Anal if (partitionIds == null) { partitionIds = new ArrayList(); for (Partition partition : olapTable.getPartitions()) { + if (!partition.hasData()) { + continue; + } partitionIds.add(partition.getId()); } + } else { + partitionIds = partitionIds.stream().filter(id -> olapTable.getPartition(id).hasData()).collect( + Collectors.toList()); } selectedPartitionNum = partitionIds.size(); diff --git a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 4f722f8afb5f05..8919ea16f9e37b 100644 --- a/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -18,18 +18,13 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprSubstitutionMap; import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.ImportColumnDesc; -import org.apache.doris.analysis.ImportColumnsStmt; -import org.apache.doris.analysis.ImportWhereStmt; import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.SqlParser; -import org.apache.doris.analysis.SqlScanner; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; @@ -49,7 +44,6 @@ import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocations; -import org.apache.doris.thrift.TStreamLoadPutRequest; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -57,7 +51,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.StringReader; import java.nio.charset.Charset; import java.util.List; import java.util.Map; @@ -124,14 +117,14 @@ public void init(Analyzer analyzer) throws UserException { // columns: k1, k2, v1, v2=k1 + k2 // this means that there are three columns(k1, k2, v1) in source file, // and v2 is derived from (k1 + k2) - if (streamLoadTask.getColumnToColumnExpr() != null && streamLoadTask.getColumnToColumnExpr().size() != 0) { - for (Map.Entry entry : streamLoadTask.getColumnToColumnExpr().entrySet()) { + if (streamLoadTask.getColumnExprDesc() != null && streamLoadTask.getColumnExprDesc().size() != 0) { + for (ImportColumnDesc importColumnDesc : streamLoadTask.getColumnExprDesc()) { // make column name case match with real column name - String column = entry.getKey(); - String realColName = dstTable.getColumn(column) == null ? column - : dstTable.getColumn(column).getName(); - if (entry.getValue() != null) { - exprsByName.put(realColName, entry.getValue()); + String columnName = importColumnDesc.getColumnName(); + String realColName = dstTable.getColumn(columnName) == null ? columnName + : dstTable.getColumn(columnName).getName(); + if (importColumnDesc.getExpr() != null) { + exprsByName.put(realColName, importColumnDesc.getExpr()); } else { SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc); slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); diff --git a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java index a477a2a920738c..d62bfb98b03902 100644 --- a/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -29,7 +29,6 @@ import org.apache.doris.analysis.SqlScanner; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; -import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; @@ -37,13 +36,12 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Joiner; -import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.StringReader; -import java.util.Map; +import java.util.List; import java.util.UUID; public class StreamLoadTask { @@ -56,7 +54,7 @@ public class StreamLoadTask { private TFileFormatType formatType; // optional - private Map columnToColumnExpr; + private List columnExprDesc; private Expr whereExpr; private ColumnSeparator columnSeparator; private String partitions; @@ -85,8 +83,8 @@ public TFileFormatType getFormatType() { return formatType; } - public Map getColumnToColumnExpr() { - return columnToColumnExpr; + public List getColumnExprDesc() { + return columnExprDesc; } public Expr getWhereExpr() { @@ -128,6 +126,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws switch (request.getFileType()) { case FILE_LOCAL: path = request.getPath(); + break; + default: + throw new UserException("unsupported file type, type=" + request.getFileType()); } } @@ -143,7 +144,7 @@ public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) { } private void setOptionalFromRoutineLoadJob(RoutineLoadJob routineLoadJob) { - columnToColumnExpr = routineLoadJob.getColumnToColumnExpr(); + columnExprDesc = routineLoadJob.getColumnDescs(); whereExpr = routineLoadJob.getWhereExpr(); columnSeparator = routineLoadJob.getColumnSeparator(); partitions = routineLoadJob.getPartitions() == null ? null : Joiner.on(",").join(routineLoadJob.getPartitions()); @@ -173,10 +174,7 @@ private void setColumnToColumnExpr(String columns) throws UserException { } if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) { - columnToColumnExpr = Maps.newHashMap(); - for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) { - columnToColumnExpr.put(columnDesc.getColumn(), columnDesc.getExpr()); - } + columnExprDesc = columnsStmt.getColumns(); } }