Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {
<< ", batch size: " << left_bytes
<< ". " << ctx->brief();

// copy one
// copy one
std::map<int32_t, int64_t> cmt_offset = ctx->kafka_info->cmt_offset;
MonotonicStopWatch watch;
watch.start();
Expand All @@ -171,6 +171,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) {

if (left_bytes == ctx->kafka_info->max_batch_size) {
// nothing to be consumed, cancel it
// we do not allow finishing stream load pipe without data
kakfa_pipe->cancel();
_cancelled = true;
return Status::CANCELLED;
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
if ((0.0 + ctx->number_filtered_rows) / num_total_rows > ctx->max_filter_ratio) {
status = Status("too many filtered rows");
}
else if(ctx->number_loaded_rows==0){
status = Status("all partitions have no load data");
}
if (ctx->number_filtered_rows > 0 &&
!executor->runtime_state()->get_error_log_file_path().empty()) {

Expand Down
2 changes: 1 addition & 1 deletion be/src/util/uid_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct UniqueId {
std::string to_string() const {
char buf[33];
to_hex(hi, buf);
buf[16] = ':';
buf[16] = '-';
to_hex(lo, buf + 17);
return {buf, 33};
}
Expand Down
12 changes: 10 additions & 2 deletions fe/src/main/java/org/apache/doris/common/util/DebugUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.DecimalFormat;
import java.util.UUID;

public class DebugUtil {
public static final DecimalFormat DECIMAL_FORMAT_SCALE_3 = new DecimalFormat("#.000");
Expand Down Expand Up @@ -120,13 +121,20 @@ public static Pair<Double, String> getByteUint(long value) {

public static String printId(final TUniqueId id) {
StringBuilder builder = new StringBuilder();
builder.append(Long.toHexString(id.hi)).append(":").append(Long.toHexString(id.lo));
builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo));
return builder.toString();
}

public static String printId(final UUID id) {
TUniqueId tUniqueId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits());
StringBuilder builder = new StringBuilder();
builder.append(Long.toHexString(tUniqueId.hi)).append("-").append(Long.toHexString(tUniqueId.lo));
return builder.toString();
}

public static String printId(final PUniqueId id) {
StringBuilder builder = new StringBuilder();
builder.append(Long.toHexString(id.hi)).append(":").append(Long.toHexString(id.lo));
builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo));
return builder.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public LogBuilder(LogKey key, Long identifier) {

public LogBuilder(LogKey key, UUID identifier) {
TUniqueId tUniqueId = new TUniqueId(identifier.getMostSignificantBits(), identifier.getLeastSignificantBits());
stringBuffer = new StringBuffer().append(key.name()).append("=").append(tUniqueId.toString()).append(", ");
stringBuffer = new StringBuffer().append(key.name()).append("=").append(DebugUtil.printId(tUniqueId)).append(", ");
entries = Lists.newLinkedList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.load.routineload;

import com.google.common.base.Joiner;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
Expand All @@ -26,6 +27,7 @@
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.RoutineLoadDesc;
Expand All @@ -34,6 +36,7 @@

import com.google.common.annotations.VisibleForTesting;

import org.apache.doris.transaction.TransactionState;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -166,6 +169,20 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException {
return Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM);
}

@Override
boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) {
if (rlTaskTxnCommitAttachment.getLoadedRows() > 0
&& ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).getPartitionIdToOffset().size() == 0) {
LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId()))
.add("job_id", id)
.add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows())
.add("progress_partition_offset_size", 0)
.add("msg", "commit attachment info is incorrect"));
return false;
}
return true;
}

@Override
protected void updateProgress(RLTaskTxnCommitAttachment attachment) {
super.updateProgress(attachment);
Expand All @@ -185,7 +202,7 @@ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoad
}

@Override
protected void executeUpdate() {
protected void unprotectUpdateProgress() {
updateNewPartitionProgress();
}

Expand All @@ -195,30 +212,47 @@ protected void executeUpdate() {
// update current kafka partition at the same time
// current kafka partitions = customKafkaPartitions == 0 ? all of partition of kafka topic : customKafkaPartitions
@Override
protected boolean needReschedule() {
if (customKafkaPartitions != null && customKafkaPartitions.size() != 0) {
currentKafkaPartitions = customKafkaPartitions;
return false;
} else {
List<Integer> newCurrentKafkaPartition;
try {
newCurrentKafkaPartition = getAllKafkaPartitions();
} catch (Exception e) {
LOG.warn("Job {} failed to fetch all current partition", id);
protected boolean unprotectNeedReschedule() {
// only running and need_schedule job need to be changed current kafka partitions
if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) {
if (customKafkaPartitions != null && customKafkaPartitions.size() != 0) {
currentKafkaPartitions = customKafkaPartitions;
return false;
}
if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) {
if (currentKafkaPartitions.size() > newCurrentKafkaPartition.size()) {
} else {
List<Integer> newCurrentKafkaPartition;
try {
newCurrentKafkaPartition = getAllKafkaPartitions();
} catch (Exception e) {
LOG.warn("Job {} failed to fetch all current partition", id);
return false;
}
if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) {
if (currentKafkaPartitions.size() > newCurrentKafkaPartition.size()) {
currentKafkaPartitions = newCurrentKafkaPartition;
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions))
.add("msg", "current kafka partitions has been change")
.build());
return true;
} else {
return false;
}
} else {
currentKafkaPartitions = newCurrentKafkaPartition;
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions))
.add("msg", "current kafka partitions has been change")
.build());
return true;
} else {
return false;
}
} else {
currentKafkaPartitions = newCurrentKafkaPartition;
return true;
}

}
} else {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("job_state", state)
.add("msg", "ignore this turn of checking changed partition when job state is not running")
.build());
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.RoutineLoadDesc;
Expand Down Expand Up @@ -57,7 +58,6 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -433,7 +433,7 @@ private void checkStateTransform(RoutineLoadJob.JobState desireState)
break;
case STOPPED:
case CANCELLED:
throw new UnsupportedOperationException("Could not transfrom " + state + " to " + desireState);
throw new UnsupportedOperationException("Could not transform " + state + " to " + desireState);
default:
break;
}
Expand All @@ -456,7 +456,7 @@ private void updateNumOfData(long numOfErrorData, long numOfTotalData) {
.add("msg", "current error num is more then max error num, begin to pause job")
.build());
// remove all of task in jobs and change job state to paused
executePause("current error num of job is more then max error num");
updateState(JobState.PAUSED, "current error num of job is more then max error num");
}

LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
Expand All @@ -476,7 +476,7 @@ private void updateNumOfData(long numOfErrorData, long numOfTotalData) {
.add("msg", "current error num is more then max error num, begin to pause job")
.build());
// remove all of task in jobs and change job state to paused
executePause("current error num is more then max error num");
updateState(JobState.PAUSED, "current error num is more then max error num");
// reset currentTotalNum and currentErrorNum
currentErrorNum = 0;
currentTotalNum = 0;
Expand Down Expand Up @@ -513,7 +513,7 @@ public void beforeAborted(TransactionState txnState, String txnStatusChangeReaso
readLock();
try {
String taskId = txnState.getLabel();
if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().toString().equals(taskId))) {
if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> DebugUtil.printId(entity.getId()).equals(taskId))) {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
.add("txn_id", txnState.getTransactionId())
.add("msg", "task will be aborted")
Expand All @@ -531,7 +531,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
// check if task has been aborted
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional =
routineLoadTaskInfoList.parallelStream()
.filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst();
.filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst();
if (!routineLoadTaskInfoOptional.isPresent()) {
throw new TransactionException("txn " + txnState.getTransactionId() + " could not be committed"
+ " while task " + txnState.getLabel() + "has been aborted ");
Expand All @@ -541,14 +541,15 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti
}
}

// the task is committed when the correct number of rows is more then 0
@Override
public void onCommitted(TransactionState txnState) throws TransactionException {
writeLock();
try {
// step0: find task in job
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional =
routineLoadTaskInfoList.parallelStream()
.filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst();
.filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst();
if (routineLoadTaskInfoOptional.isPresent()) {
executeCommitTask(routineLoadTaskInfoOptional.get(), txnState);
} else {
Expand All @@ -572,27 +573,28 @@ public void onCommitted(TransactionState txnState) throws TransactionException {
}
}

// the task is aborted when the correct number of rows is more then 0
// be will abort txn when all of kafka data is wrong or total consume data is 0
// txn will be aborted but progress will be update
// be will abort txn when all of kafka data is wrong
// progress will be update otherwise the progress will be hung
@Override
public void onAborted(TransactionState txnState, String txnStatusChangeReason) {
if (txnStatusChangeReason != null) {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
.add("txn_id", txnState.getTransactionId())
.add("msg", "task will be reschedule when txn abort with reason " + txnStatusChangeReason)
.add("msg", "txn abort with reason " + txnStatusChangeReason)
.build());
} else {
LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
.add("txn_id", txnState.getTransactionId())
.add("msg", "task will be reschedule when txn abort").build());
.add("msg", "txn abort").build());
}
writeLock();
try {
// step0: find task in job
Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional =
routineLoadTaskInfoList.parallelStream()
.filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst();
.filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst();
if (routineLoadTaskInfoOptional.isPresent()) {
// todo(ml): use previous be id depend on change reason
executeCommitTask(routineLoadTaskInfoOptional.get(), txnState);
Expand Down Expand Up @@ -625,8 +627,8 @@ private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, Transact
.add("msg", "commit task will be ignore when attachment txn of task is null,"
+ " maybe task was committed by master when timeout")
.build());
} else {
// step1: update job progress
} else if (checkCommitInfo(rlTaskTxnCommitAttachment)) {
// step2: update job progress
updateProgress(rlTaskTxnCommitAttachment);
}

Expand All @@ -637,6 +639,9 @@ private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, Transact
}
}

// check the correctness of commit info
abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment);

protected static void unprotectCheckCreate(CreateRoutineLoadStmt stmt) throws AnalysisException {
// check table belong to db, partitions belong to table
if (stmt.getRoutineLoadDesc() == null) {
Expand Down Expand Up @@ -762,19 +767,24 @@ public void update() {
}

// check if partition has been changed
if (needReschedule()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("msg", "Job need to be rescheduled")
.build());
executeUpdate();
updateState(JobState.NEED_SCHEDULE);
writeLock();
try {
if (unprotectNeedReschedule()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("msg", "Job need to be rescheduled")
.build());
unprotectUpdateProgress();
executeNeedSchedule();
}
} finally {
writeUnlock();
}
}

protected void executeUpdate() {
protected void unprotectUpdateProgress() {
}

protected boolean needReschedule() {
protected boolean unprotectNeedReschedule() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti
// begin a txn for task
RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId);
txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction(
routineLoadJob.getDbId(), id.toString(), -1, "streamLoad",
routineLoadJob.getDbId(), DebugUtil.printId(id), -1, "streamLoad",
TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob);
}

Expand Down
Loading