diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 0e701b3153a4a8..e52d3332d4af36 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -145,7 +145,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { << ", batch size: " << left_bytes << ". " << ctx->brief(); - // copy one +// copy one std::map cmt_offset = ctx->kafka_info->cmt_offset; MonotonicStopWatch watch; watch.start(); @@ -171,6 +171,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { if (left_bytes == ctx->kafka_info->max_batch_size) { // nothing to be consumed, cancel it + // we do not allow finishing stream load pipe without data kakfa_pipe->cancel(); _cancelled = true; return Status::CANCELLED; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 0cae6eb87511aa..774b0d7bb47c3c 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -58,6 +58,9 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { if ((0.0 + ctx->number_filtered_rows) / num_total_rows > ctx->max_filter_ratio) { status = Status("too many filtered rows"); } + else if(ctx->number_loaded_rows==0){ + status = Status("all partitions have no load data"); + } if (ctx->number_filtered_rows > 0 && !executor->runtime_state()->get_error_log_file_path().empty()) { diff --git a/be/src/util/uid_util.h b/be/src/util/uid_util.h index 92e2fec1cf6304..d7a73979876c4d 100644 --- a/be/src/util/uid_util.h +++ b/be/src/util/uid_util.h @@ -59,7 +59,7 @@ struct UniqueId { std::string to_string() const { char buf[33]; to_hex(hi, buf); - buf[16] = ':'; + buf[16] = '-'; to_hex(lo, buf + 17); return {buf, 33}; } diff --git a/fe/src/main/java/org/apache/doris/common/util/DebugUtil.java b/fe/src/main/java/org/apache/doris/common/util/DebugUtil.java index 5e239fcfdf2bd8..783c06d88b3068 100644 --- a/fe/src/main/java/org/apache/doris/common/util/DebugUtil.java +++ b/fe/src/main/java/org/apache/doris/common/util/DebugUtil.java @@ -24,6 +24,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.text.DecimalFormat; +import java.util.UUID; public class DebugUtil { public static final DecimalFormat DECIMAL_FORMAT_SCALE_3 = new DecimalFormat("#.000"); @@ -120,13 +121,20 @@ public static Pair getByteUint(long value) { public static String printId(final TUniqueId id) { StringBuilder builder = new StringBuilder(); - builder.append(Long.toHexString(id.hi)).append(":").append(Long.toHexString(id.lo)); + builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo)); + return builder.toString(); + } + + public static String printId(final UUID id) { + TUniqueId tUniqueId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits()); + StringBuilder builder = new StringBuilder(); + builder.append(Long.toHexString(tUniqueId.hi)).append("-").append(Long.toHexString(tUniqueId.lo)); return builder.toString(); } public static String printId(final PUniqueId id) { StringBuilder builder = new StringBuilder(); - builder.append(Long.toHexString(id.hi)).append(":").append(Long.toHexString(id.lo)); + builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo)); return builder.toString(); } diff --git a/fe/src/main/java/org/apache/doris/common/util/LogBuilder.java b/fe/src/main/java/org/apache/doris/common/util/LogBuilder.java index 755afb9b25cb57..434fa002df8b40 100644 --- a/fe/src/main/java/org/apache/doris/common/util/LogBuilder.java +++ b/fe/src/main/java/org/apache/doris/common/util/LogBuilder.java @@ -44,7 +44,7 @@ public LogBuilder(LogKey key, Long identifier) { public LogBuilder(LogKey key, UUID identifier) { TUniqueId tUniqueId = new TUniqueId(identifier.getMostSignificantBits(), identifier.getLeastSignificantBits()); - stringBuffer = new StringBuffer().append(key.name()).append("=").append(tUniqueId.toString()).append(", "); + stringBuffer = new StringBuffer().append(key.name()).append("=").append(DebugUtil.printId(tUniqueId)).append(", "); entries = Lists.newLinkedList(); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index bb50347cb63777..757f3bbd10f222 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -17,6 +17,7 @@ package org.apache.doris.load.routineload; +import com.google.common.base.Joiner; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -26,6 +27,7 @@ import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.load.RoutineLoadDesc; @@ -34,6 +36,7 @@ import com.google.common.annotations.VisibleForTesting; +import org.apache.doris.transaction.TransactionState; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.logging.log4j.LogManager; @@ -166,6 +169,20 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); } + @Override + boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 + && ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).getPartitionIdToOffset().size() == 0) { + LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId())) + .add("job_id", id) + .add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows()) + .add("progress_partition_offset_size", 0) + .add("msg", "commit attachment info is incorrect")); + return false; + } + return true; + } + @Override protected void updateProgress(RLTaskTxnCommitAttachment attachment) { super.updateProgress(attachment); @@ -185,7 +202,7 @@ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoad } @Override - protected void executeUpdate() { + protected void unprotectUpdateProgress() { updateNewPartitionProgress(); } @@ -195,30 +212,47 @@ protected void executeUpdate() { // update current kafka partition at the same time // current kafka partitions = customKafkaPartitions == 0 ? all of partition of kafka topic : customKafkaPartitions @Override - protected boolean needReschedule() { - if (customKafkaPartitions != null && customKafkaPartitions.size() != 0) { - currentKafkaPartitions = customKafkaPartitions; - return false; - } else { - List newCurrentKafkaPartition; - try { - newCurrentKafkaPartition = getAllKafkaPartitions(); - } catch (Exception e) { - LOG.warn("Job {} failed to fetch all current partition", id); + protected boolean unprotectNeedReschedule() { + // only running and need_schedule job need to be changed current kafka partitions + if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { + if (customKafkaPartitions != null && customKafkaPartitions.size() != 0) { + currentKafkaPartitions = customKafkaPartitions; return false; - } - if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) { - if (currentKafkaPartitions.size() > newCurrentKafkaPartition.size()) { + } else { + List newCurrentKafkaPartition; + try { + newCurrentKafkaPartition = getAllKafkaPartitions(); + } catch (Exception e) { + LOG.warn("Job {} failed to fetch all current partition", id); + return false; + } + if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) { + if (currentKafkaPartitions.size() > newCurrentKafkaPartition.size()) { + currentKafkaPartitions = newCurrentKafkaPartition; + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) + .add("msg", "current kafka partitions has been change") + .build()); + return true; + } else { + return false; + } + } else { currentKafkaPartitions = newCurrentKafkaPartition; + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) + .add("msg", "current kafka partitions has been change") + .build()); return true; - } else { - return false; } - } else { - currentKafkaPartitions = newCurrentKafkaPartition; - return true; - } + } + } else { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("job_state", state) + .add("msg", "ignore this turn of checking changed partition when job state is not running") + .build()); + return false; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 0d4c64e8837085..f0ee4dbfd22f79 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -30,6 +30,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.load.RoutineLoadDesc; @@ -57,7 +58,6 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -433,7 +433,7 @@ private void checkStateTransform(RoutineLoadJob.JobState desireState) break; case STOPPED: case CANCELLED: - throw new UnsupportedOperationException("Could not transfrom " + state + " to " + desireState); + throw new UnsupportedOperationException("Could not transform " + state + " to " + desireState); default: break; } @@ -456,7 +456,7 @@ private void updateNumOfData(long numOfErrorData, long numOfTotalData) { .add("msg", "current error num is more then max error num, begin to pause job") .build()); // remove all of task in jobs and change job state to paused - executePause("current error num of job is more then max error num"); + updateState(JobState.PAUSED, "current error num of job is more then max error num"); } LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) @@ -476,7 +476,7 @@ private void updateNumOfData(long numOfErrorData, long numOfTotalData) { .add("msg", "current error num is more then max error num, begin to pause job") .build()); // remove all of task in jobs and change job state to paused - executePause("current error num is more then max error num"); + updateState(JobState.PAUSED, "current error num is more then max error num"); // reset currentTotalNum and currentErrorNum currentErrorNum = 0; currentTotalNum = 0; @@ -513,7 +513,7 @@ public void beforeAborted(TransactionState txnState, String txnStatusChangeReaso readLock(); try { String taskId = txnState.getLabel(); - if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().toString().equals(taskId))) { + if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> DebugUtil.printId(entity.getId()).equals(taskId))) { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) .add("txn_id", txnState.getTransactionId()) .add("msg", "task will be aborted") @@ -531,7 +531,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti // check if task has been aborted Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream() - .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); + .filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst(); if (!routineLoadTaskInfoOptional.isPresent()) { throw new TransactionException("txn " + txnState.getTransactionId() + " could not be committed" + " while task " + txnState.getLabel() + "has been aborted "); @@ -541,6 +541,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti } } + // the task is committed when the correct number of rows is more then 0 @Override public void onCommitted(TransactionState txnState) throws TransactionException { writeLock(); @@ -548,7 +549,7 @@ public void onCommitted(TransactionState txnState) throws TransactionException { // step0: find task in job Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream() - .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); + .filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); } else { @@ -572,27 +573,28 @@ public void onCommitted(TransactionState txnState) throws TransactionException { } } + // the task is aborted when the correct number of rows is more then 0 + // be will abort txn when all of kafka data is wrong or total consume data is 0 // txn will be aborted but progress will be update - // be will abort txn when all of kafka data is wrong // progress will be update otherwise the progress will be hung @Override public void onAborted(TransactionState txnState, String txnStatusChangeReason) { if (txnStatusChangeReason != null) { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) .add("txn_id", txnState.getTransactionId()) - .add("msg", "task will be reschedule when txn abort with reason " + txnStatusChangeReason) + .add("msg", "txn abort with reason " + txnStatusChangeReason) .build()); } else { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) .add("txn_id", txnState.getTransactionId()) - .add("msg", "task will be reschedule when txn abort").build()); + .add("msg", "txn abort").build()); } writeLock(); try { // step0: find task in job Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream() - .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); + .filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { // todo(ml): use previous be id depend on change reason executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); @@ -625,8 +627,8 @@ private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, Transact .add("msg", "commit task will be ignore when attachment txn of task is null," + " maybe task was committed by master when timeout") .build()); - } else { - // step1: update job progress + } else if (checkCommitInfo(rlTaskTxnCommitAttachment)) { + // step2: update job progress updateProgress(rlTaskTxnCommitAttachment); } @@ -637,6 +639,9 @@ private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, Transact } } + // check the correctness of commit info + abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment); + protected static void unprotectCheckCreate(CreateRoutineLoadStmt stmt) throws AnalysisException { // check table belong to db, partitions belong to table if (stmt.getRoutineLoadDesc() == null) { @@ -762,19 +767,24 @@ public void update() { } // check if partition has been changed - if (needReschedule()) { - LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("msg", "Job need to be rescheduled") - .build()); - executeUpdate(); - updateState(JobState.NEED_SCHEDULE); + writeLock(); + try { + if (unprotectNeedReschedule()) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("msg", "Job need to be rescheduled") + .build()); + unprotectUpdateProgress(); + executeNeedSchedule(); + } + } finally { + writeUnlock(); } } - protected void executeUpdate() { + protected void unprotectUpdateProgress() { } - protected boolean needReschedule() { + protected boolean unprotectNeedReschedule() { return false; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 4d0af1a5c535a2..c55bcabdfe94f6 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -23,6 +23,7 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; @@ -100,7 +101,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti // begin a txn for task RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( - routineLoadJob.getDbId(), id.toString(), -1, "streamLoad", + routineLoadJob.getDbId(), DebugUtil.printId(id), -1, "streamLoad", TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob); } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java new file mode 100644 index 00000000000000..0e3006b5c74997 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.routineload; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class KafkaProducerTest { + + public Producer createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.74.167.16:8792"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "client1"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer<>(props); + } + + public static void main(String[] args) throws InterruptedException { + KafkaProducerTest kafkaProducerTest = new KafkaProducerTest(); + Producer kafkaProducer = kafkaProducerTest.createProducer(); + int i = 411; + while (true) { + String value = String.valueOf(i); +// if (i % 5 == 0) { +// value = value + "\t" + value; +// } else if (i % 6 == 0) { +// value = value + "\t" + value; +// } + ProducerRecord record = new ProducerRecord<>("miaoling", value); + try { + RecordMetadata metadata = kafkaProducer.send(record).get(); + System.out.println("Record send with value " + value + " to partition " + metadata.partition() + " with offset " + metadata.offset()); + } catch (ExecutionException e) { + System.out.println("Error in sending record " + value); + System.out.println(e); + } catch (InterruptedException e) { + System.out.println("Error in sending record " + value); + System.out.println(e); + } + i++; + Thread.sleep(500); + + } + } + +}