Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {

// copy one
std::map<int32_t, int64_t> cmt_offset = ctx->kafka_info->cmt_offset;
MonotonicStopWatch consumer_watch;
MonotonicStopWatch watch;
watch.start();
Status st;
Expand All @@ -166,10 +167,11 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {
}

if (left_time <= 0 || left_rows <= 0 || left_bytes <=0) {
VLOG(3) << "kafka consume batch done"
LOG(INFO) << "kafka consume batch done"
<< ". left time=" << left_time
<< ", left rows=" << left_rows
<< ", left bytes=" << left_bytes;
<< ", left bytes=" << left_bytes
<< ", consumer time(ms)=" << consumer_watch.elapsed_time() / 1000 / 1000;

if (left_bytes == ctx->max_batch_size) {
// nothing to be consumed, cancel it
Expand All @@ -182,16 +184,19 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {
DCHECK(left_rows < ctx->max_batch_rows);
kakfa_pipe->finish();
ctx->kafka_info->cmt_offset = std::move(cmt_offset);
ctx->receive_bytes = ctx->max_batch_size - left_bytes;
_finished = true;
return Status::OK;
}
}

// consume 1 message at a time
consumer_watch.start();
RdKafka::Message *msg = _k_consumer->consume(1000 /* timeout, ms */);
consumer_watch.stop();
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR:
LOG(INFO) << "get kafka message"
VLOG(3) << "get kafka message"
<< ", partition: " << msg->partition()
<< ", offset: " << msg->offset()
<< ", len: " << msg->len();
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ load_property ::=
{:
RESULT = columnsInfo;
:}
| where_clause_without_null:wherePredicate
| import_where_stmt:wherePredicate
{:
RESULT = wherePredicate;
:}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
import java.util.Map;

// FORMAT:
// ALTER SYSTEM SET GLOBAL LOAD_ERROR_URL= "mysql://user:password@host:port[/database[/table]]"
// ALTER SYSTEM SET GLOBAL LOAD_ERROR_URL= "broker://"
// ALTER SYSTEM SET LOAD ERRORS HUB properties("type" = "xxx");

public class AlterLoadErrorUrlClause extends AlterClause {
private static final Logger LOG = LogManager.getLogger(AlterLoadErrorUrlClause.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void checkLoadProperties(Analyzer analyzer) throws UserException {
ColumnSeparator columnSeparator = null;
ImportColumnsStmt importColumnsStmt = null;
ImportWhereStmt importWhereStmt = null;
PartitionNames partitionNames = null;
List<String> partitionNames = null;
for (ParseNode parseNode : loadPropertyList) {
if (parseNode instanceof ColumnSeparator) {
// check column separator
Expand All @@ -252,12 +252,13 @@ public void checkLoadProperties(Analyzer analyzer) throws UserException {
if (partitionNames != null) {
throw new AnalysisException("repeat setting of partition names");
}
partitionNames = (PartitionNames) parseNode;
partitionNames.analyze(null);
PartitionNames partitionNamesNode = (PartitionNames) parseNode;
partitionNamesNode.analyze(null);
partitionNames = partitionNamesNode.getPartitionNames();
}
}
routineLoadDesc = new RoutineLoadDesc(columnSeparator, importColumnsStmt, importWhereStmt,
partitionNames.getPartitionNames());
partitionNames);
}

private void checkJobProperties() throws AnalysisException {
Expand Down
20 changes: 15 additions & 5 deletions fe/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@
* Created by zhaochun on 2018/4/23.
*/
public class ImportColumnDesc {
private String column;
private String columnName;
private Expr expr;

public ImportColumnDesc(String column) {
this.column = column;
this.columnName = column;
}

public ImportColumnDesc(String column, Expr expr) {
this.column = column;
this.columnName = column;
this.expr = expr;
}

public String getColumn() {
return column;
public String getColumnName() {
return columnName;
}

public Expr getExpr() {
Expand All @@ -44,4 +44,14 @@ public Expr getExpr() {
public boolean isColumn() {
return expr == null;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(columnName);
if (expr != null) {
sb.append(" = ").append(expr.toSql());
}
return sb.toString();
}
}
4 changes: 4 additions & 0 deletions fe/src/main/java/org/apache/doris/catalog/Partition.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ public long getDataSize() {
return dataSize;
}

public boolean hasData() {
return !(visibleVersion == PARTITION_INIT_VERSION && visibleVersionHash == PARTITION_INIT_VERSION_HASH);
}

public static Partition read(DataInput in) throws IOException {
Partition partition = new Partition();
partition.readFields(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.doris.load.routineload;

import com.google.common.collect.Maps;
import com.google.gson.Gson;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
Expand All @@ -41,6 +39,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.doris.load.routineload;

import com.google.common.base.Joiner;
import com.google.gson.Gson;
import org.apache.doris.analysis.ColumnSeparator;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.Expr;
Expand Down Expand Up @@ -55,9 +53,11 @@
import org.apache.doris.transaction.TxnStateChangeListener;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -136,7 +136,7 @@ public boolean isFinalState() {
protected long authCode;
// protected RoutineLoadDesc routineLoadDesc; // optional
protected List<String> partitions; // optional
protected Map<String, Expr> columnToColumnExpr; // optional
protected List<ImportColumnDesc> columnDescs; // optional
protected Expr whereExpr; // optional
protected ColumnSeparator columnSeparator; // optional
protected int desireTaskConcurrentNum; // optional
Expand Down Expand Up @@ -240,9 +240,9 @@ private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
if (routineLoadDesc.getColumnsInfo() != null) {
ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo();
if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) {
columnToColumnExpr = Maps.newHashMap();
columnDescs = Lists.newArrayList();
for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) {
columnToColumnExpr.put(columnDesc.getColumn(), columnDesc.getExpr());
columnDescs.add(columnDesc);
}
}
}
Expand Down Expand Up @@ -337,8 +337,8 @@ public List<String> getPartitions() {
return partitions;
}

public Map<String, Expr> getColumnToColumnExpr() {
return columnToColumnExpr;
public List<ImportColumnDesc> getColumnDescs() {
return columnDescs;
}

public Expr getWhereExpr() {
Expand Down Expand Up @@ -488,8 +488,11 @@ private void updateNumOfData(long numOfErrorRows, long numOfTotalRows, boolean i
.add("max_error_num", maxErrorNum)
.add("msg", "current error rows is more then max error num, begin to pause job")
.build());
// remove all of task in jobs and change job state to paused
updateState(JobState.PAUSED, "current error rows of job is more then max error num", isReplay);
// if this is a replay thread, the update state should already be replayed by OP_CHANGE_ROUTINE_LOAD_JOB
if (!isReplay) {
// remove all of task in jobs and change job state to paused
updateState(JobState.PAUSED, "current error rows of job is more then max error num", isReplay);
}
}

LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
Expand All @@ -508,8 +511,10 @@ private void updateNumOfData(long numOfErrorRows, long numOfTotalRows, boolean i
.add("max_error_num", maxErrorNum)
.add("msg", "current error rows is more then max error rows, begin to pause job")
.build());
// remove all of task in jobs and change job state to paused
updateState(JobState.PAUSED, "current error rows is more then max error num", isReplay);
if (!isReplay) {
// remove all of task in jobs and change job state to paused
updateState(JobState.PAUSED, "current error rows is more then max error num", isReplay);
}
// reset currentTotalNum and currentErrorNum
currentErrorRows = 0;
currentTotalRows = 0;
Expand Down Expand Up @@ -624,6 +629,7 @@ public ListenResult onCommitted(TransactionState txnState) throws TransactionExc

@Override
public void replayOnCommitted(TransactionState txnState) {
Preconditions.checkNotNull(txnState.getTxnCommitAttachment(), txnState);
replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
LOG.debug("replay on committed: {}", txnState);
}
Expand Down Expand Up @@ -685,8 +691,11 @@ public ListenResult onAborted(TransactionState txnState, String txnStatusChangeR

@Override
public void replayOnAborted(TransactionState txnState) {
replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
LOG.debug("replay on aborted: {}", txnState);
// attachment may be null if this task is aborted by FE
if (txnState.getTxnCommitAttachment() != null) {
replayUpdateProgress((RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment());
}
LOG.debug("replay on aborted: {}, has attachment: {}", txnState, txnState.getTxnCommitAttachment() == null);
}

// check task exists or not before call method
Expand Down Expand Up @@ -931,8 +940,7 @@ public List<List<String>> getTasksShowInfo() {
private String jobPropertiesToJsonString() {
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put("partitions", partitions == null ? STAR_STRING : Joiner.on(",").join(partitions));
jobProperties.put("columnToColumnExpr", columnToColumnExpr == null ?
STAR_STRING : Joiner.on(",").withKeyValueSeparator(":").join(columnToColumnExpr));
jobProperties.put("columnToColumnExpr", columnDescs == null ? STAR_STRING : Joiner.on(",").join(columnDescs));
jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING : whereExpr.toString());
jobProperties.put("columnSeparator", columnSeparator == null ? "\t" : columnSeparator.toString());
jobProperties.put("maxErrorNum", String.valueOf(maxErrorNum));
Expand Down
7 changes: 7 additions & 0 deletions fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Full scan of an Olap table.
Expand Down Expand Up @@ -508,8 +509,14 @@ private void getScanRangeLocations(Analyzer analyzer) throws UserException, Anal
if (partitionIds == null) {
partitionIds = new ArrayList<Long>();
for (Partition partition : olapTable.getPartitions()) {
if (!partition.hasData()) {
continue;
}
partitionIds.add(partition.getId());
}
} else {
partitionIds = partitionIds.stream().filter(id -> olapTable.getPartition(id).hasData()).collect(
Collectors.toList());
}

selectedPartitionNum = partitionIds.size();
Expand Down
21 changes: 7 additions & 14 deletions fe/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@
package org.apache.doris.planner;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ColumnSeparator;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.ImportColumnsStmt;
import org.apache.doris.analysis.ImportWhereStmt;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
Expand All @@ -49,15 +44,13 @@
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TStreamLoadPutRequest;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.StringReader;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -124,14 +117,14 @@ public void init(Analyzer analyzer) throws UserException {
// columns: k1, k2, v1, v2=k1 + k2
// this means that there are three columns(k1, k2, v1) in source file,
// and v2 is derived from (k1 + k2)
if (streamLoadTask.getColumnToColumnExpr() != null && streamLoadTask.getColumnToColumnExpr().size() != 0) {
for (Map.Entry<String, Expr> entry : streamLoadTask.getColumnToColumnExpr().entrySet()) {
if (streamLoadTask.getColumnExprDesc() != null && streamLoadTask.getColumnExprDesc().size() != 0) {
for (ImportColumnDesc importColumnDesc : streamLoadTask.getColumnExprDesc()) {
// make column name case match with real column name
String column = entry.getKey();
String realColName = dstTable.getColumn(column) == null ? column
: dstTable.getColumn(column).getName();
if (entry.getValue() != null) {
exprsByName.put(realColName, entry.getValue());
String columnName = importColumnDesc.getColumnName();
String realColName = dstTable.getColumn(columnName) == null ? columnName
: dstTable.getColumn(columnName).getName();
if (importColumnDesc.getExpr() != null) {
exprsByName.put(realColName, importColumnDesc.getExpr());
} else {
SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
Expand Down
Loading