From e98db05afafc7e810c8faaeecacdffa0b11a51c2 Mon Sep 17 00:00:00 2001 From: emmymiao87 <522274284@qq.com> Date: Mon, 11 Mar 2019 21:15:59 +0800 Subject: [PATCH] Stream load with no data will abort txn 1. stream load executor will abort txn when no correct data in task 2. change txn label to DebugUtil.print(UUID) which is same as task id printed by be 3. change print uuid to hi-lo --- be/src/runtime/routine_load/data_consumer.cpp | 3 +- .../stream_load/stream_load_executor.cpp | 3 + be/src/util/uid_util.h | 2 +- .../apache/doris/common/util/DebugUtil.java | 12 ++- .../apache/doris/common/util/LogBuilder.java | 2 +- .../load/routineload/KafkaRoutineLoadJob.java | 74 ++++++++++++++----- .../load/routineload/RoutineLoadJob.java | 52 +++++++------ .../load/routineload/RoutineLoadTaskInfo.java | 3 +- .../load/routineload/KafkaProducerTest.java | 73 ++++++++++++++++++ 9 files changed, 177 insertions(+), 47 deletions(-) create mode 100644 fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 0e701b3153a4a8..e52d3332d4af36 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -145,7 +145,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { << ", batch size: " << left_bytes << ". " << ctx->brief(); - // copy one +// copy one std::map cmt_offset = ctx->kafka_info->cmt_offset; MonotonicStopWatch watch; watch.start(); @@ -171,6 +171,7 @@ Status KafkaDataConsumer::start(StreamLoadContext* ctx) { if (left_bytes == ctx->kafka_info->max_batch_size) { // nothing to be consumed, cancel it + // we do not allow finishing stream load pipe without data kakfa_pipe->cancel(); _cancelled = true; return Status::CANCELLED; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 0cae6eb87511aa..774b0d7bb47c3c 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -58,6 +58,9 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { if ((0.0 + ctx->number_filtered_rows) / num_total_rows > ctx->max_filter_ratio) { status = Status("too many filtered rows"); } + else if(ctx->number_loaded_rows==0){ + status = Status("all partitions have no load data"); + } if (ctx->number_filtered_rows > 0 && !executor->runtime_state()->get_error_log_file_path().empty()) { diff --git a/be/src/util/uid_util.h b/be/src/util/uid_util.h index 92e2fec1cf6304..d7a73979876c4d 100644 --- a/be/src/util/uid_util.h +++ b/be/src/util/uid_util.h @@ -59,7 +59,7 @@ struct UniqueId { std::string to_string() const { char buf[33]; to_hex(hi, buf); - buf[16] = ':'; + buf[16] = '-'; to_hex(lo, buf + 17); return {buf, 33}; } diff --git a/fe/src/main/java/org/apache/doris/common/util/DebugUtil.java b/fe/src/main/java/org/apache/doris/common/util/DebugUtil.java index 5e239fcfdf2bd8..783c06d88b3068 100644 --- a/fe/src/main/java/org/apache/doris/common/util/DebugUtil.java +++ b/fe/src/main/java/org/apache/doris/common/util/DebugUtil.java @@ -24,6 +24,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.text.DecimalFormat; +import java.util.UUID; public class DebugUtil { public static final DecimalFormat DECIMAL_FORMAT_SCALE_3 = new DecimalFormat("#.000"); @@ -120,13 +121,20 @@ public static Pair getByteUint(long value) { public static String printId(final TUniqueId id) { StringBuilder builder = new StringBuilder(); - builder.append(Long.toHexString(id.hi)).append(":").append(Long.toHexString(id.lo)); + builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo)); + return builder.toString(); + } + + public static String printId(final UUID id) { + TUniqueId tUniqueId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits()); + StringBuilder builder = new StringBuilder(); + builder.append(Long.toHexString(tUniqueId.hi)).append("-").append(Long.toHexString(tUniqueId.lo)); return builder.toString(); } public static String printId(final PUniqueId id) { StringBuilder builder = new StringBuilder(); - builder.append(Long.toHexString(id.hi)).append(":").append(Long.toHexString(id.lo)); + builder.append(Long.toHexString(id.hi)).append("-").append(Long.toHexString(id.lo)); return builder.toString(); } diff --git a/fe/src/main/java/org/apache/doris/common/util/LogBuilder.java b/fe/src/main/java/org/apache/doris/common/util/LogBuilder.java index 755afb9b25cb57..434fa002df8b40 100644 --- a/fe/src/main/java/org/apache/doris/common/util/LogBuilder.java +++ b/fe/src/main/java/org/apache/doris/common/util/LogBuilder.java @@ -44,7 +44,7 @@ public LogBuilder(LogKey key, Long identifier) { public LogBuilder(LogKey key, UUID identifier) { TUniqueId tUniqueId = new TUniqueId(identifier.getMostSignificantBits(), identifier.getLeastSignificantBits()); - stringBuffer = new StringBuffer().append(key.name()).append("=").append(tUniqueId.toString()).append(", "); + stringBuffer = new StringBuffer().append(key.name()).append("=").append(DebugUtil.printId(tUniqueId)).append(", "); entries = Lists.newLinkedList(); } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index bb50347cb63777..757f3bbd10f222 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -17,6 +17,7 @@ package org.apache.doris.load.routineload; +import com.google.common.base.Joiner; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -26,6 +27,7 @@ import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.load.RoutineLoadDesc; @@ -34,6 +36,7 @@ import com.google.common.annotations.VisibleForTesting; +import org.apache.doris.transaction.TransactionState; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.logging.log4j.LogManager; @@ -166,6 +169,20 @@ public int calculateCurrentConcurrentTaskNum() throws MetaNotFoundException { return Math.min(Math.min(partitionNum, Math.min(desireTaskConcurrentNum, aliveBeNum)), DEFAULT_TASK_MAX_CONCURRENT_NUM); } + @Override + boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment) { + if (rlTaskTxnCommitAttachment.getLoadedRows() > 0 + && ((KafkaProgress) rlTaskTxnCommitAttachment.getProgress()).getPartitionIdToOffset().size() == 0) { + LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, DebugUtil.printId(rlTaskTxnCommitAttachment.getTaskId())) + .add("job_id", id) + .add("loaded_rows", rlTaskTxnCommitAttachment.getLoadedRows()) + .add("progress_partition_offset_size", 0) + .add("msg", "commit attachment info is incorrect")); + return false; + } + return true; + } + @Override protected void updateProgress(RLTaskTxnCommitAttachment attachment) { super.updateProgress(attachment); @@ -185,7 +202,7 @@ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoad } @Override - protected void executeUpdate() { + protected void unprotectUpdateProgress() { updateNewPartitionProgress(); } @@ -195,30 +212,47 @@ protected void executeUpdate() { // update current kafka partition at the same time // current kafka partitions = customKafkaPartitions == 0 ? all of partition of kafka topic : customKafkaPartitions @Override - protected boolean needReschedule() { - if (customKafkaPartitions != null && customKafkaPartitions.size() != 0) { - currentKafkaPartitions = customKafkaPartitions; - return false; - } else { - List newCurrentKafkaPartition; - try { - newCurrentKafkaPartition = getAllKafkaPartitions(); - } catch (Exception e) { - LOG.warn("Job {} failed to fetch all current partition", id); + protected boolean unprotectNeedReschedule() { + // only running and need_schedule job need to be changed current kafka partitions + if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { + if (customKafkaPartitions != null && customKafkaPartitions.size() != 0) { + currentKafkaPartitions = customKafkaPartitions; return false; - } - if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) { - if (currentKafkaPartitions.size() > newCurrentKafkaPartition.size()) { + } else { + List newCurrentKafkaPartition; + try { + newCurrentKafkaPartition = getAllKafkaPartitions(); + } catch (Exception e) { + LOG.warn("Job {} failed to fetch all current partition", id); + return false; + } + if (currentKafkaPartitions.containsAll(newCurrentKafkaPartition)) { + if (currentKafkaPartitions.size() > newCurrentKafkaPartition.size()) { + currentKafkaPartitions = newCurrentKafkaPartition; + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) + .add("msg", "current kafka partitions has been change") + .build()); + return true; + } else { + return false; + } + } else { currentKafkaPartitions = newCurrentKafkaPartition; + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) + .add("msg", "current kafka partitions has been change") + .build()); return true; - } else { - return false; } - } else { - currentKafkaPartitions = newCurrentKafkaPartition; - return true; - } + } + } else { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("job_state", state) + .add("msg", "ignore this turn of checking changed partition when job state is not running") + .build()); + return false; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 0d4c64e8837085..f0ee4dbfd22f79 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -30,6 +30,7 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.load.RoutineLoadDesc; @@ -57,7 +58,6 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -433,7 +433,7 @@ private void checkStateTransform(RoutineLoadJob.JobState desireState) break; case STOPPED: case CANCELLED: - throw new UnsupportedOperationException("Could not transfrom " + state + " to " + desireState); + throw new UnsupportedOperationException("Could not transform " + state + " to " + desireState); default: break; } @@ -456,7 +456,7 @@ private void updateNumOfData(long numOfErrorData, long numOfTotalData) { .add("msg", "current error num is more then max error num, begin to pause job") .build()); // remove all of task in jobs and change job state to paused - executePause("current error num of job is more then max error num"); + updateState(JobState.PAUSED, "current error num of job is more then max error num"); } LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) @@ -476,7 +476,7 @@ private void updateNumOfData(long numOfErrorData, long numOfTotalData) { .add("msg", "current error num is more then max error num, begin to pause job") .build()); // remove all of task in jobs and change job state to paused - executePause("current error num is more then max error num"); + updateState(JobState.PAUSED, "current error num is more then max error num"); // reset currentTotalNum and currentErrorNum currentErrorNum = 0; currentTotalNum = 0; @@ -513,7 +513,7 @@ public void beforeAborted(TransactionState txnState, String txnStatusChangeReaso readLock(); try { String taskId = txnState.getLabel(); - if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> entity.getId().toString().equals(taskId))) { + if (routineLoadTaskInfoList.parallelStream().anyMatch(entity -> DebugUtil.printId(entity.getId()).equals(taskId))) { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) .add("txn_id", txnState.getTransactionId()) .add("msg", "task will be aborted") @@ -531,7 +531,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti // check if task has been aborted Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream() - .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); + .filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst(); if (!routineLoadTaskInfoOptional.isPresent()) { throw new TransactionException("txn " + txnState.getTransactionId() + " could not be committed" + " while task " + txnState.getLabel() + "has been aborted "); @@ -541,6 +541,7 @@ public void beforeCommitted(TransactionState txnState) throws TransactionExcepti } } + // the task is committed when the correct number of rows is more then 0 @Override public void onCommitted(TransactionState txnState) throws TransactionException { writeLock(); @@ -548,7 +549,7 @@ public void onCommitted(TransactionState txnState) throws TransactionException { // step0: find task in job Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream() - .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); + .filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); } else { @@ -572,27 +573,28 @@ public void onCommitted(TransactionState txnState) throws TransactionException { } } + // the task is aborted when the correct number of rows is more then 0 + // be will abort txn when all of kafka data is wrong or total consume data is 0 // txn will be aborted but progress will be update - // be will abort txn when all of kafka data is wrong // progress will be update otherwise the progress will be hung @Override public void onAborted(TransactionState txnState, String txnStatusChangeReason) { if (txnStatusChangeReason != null) { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) .add("txn_id", txnState.getTransactionId()) - .add("msg", "task will be reschedule when txn abort with reason " + txnStatusChangeReason) + .add("msg", "txn abort with reason " + txnStatusChangeReason) .build()); } else { LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel()) .add("txn_id", txnState.getTransactionId()) - .add("msg", "task will be reschedule when txn abort").build()); + .add("msg", "txn abort").build()); } writeLock(); try { // step0: find task in job Optional routineLoadTaskInfoOptional = routineLoadTaskInfoList.parallelStream() - .filter(entity -> entity.getId().toString().equals(txnState.getLabel())).findFirst(); + .filter(entity -> DebugUtil.printId(entity.getId()).equals(txnState.getLabel())).findFirst(); if (routineLoadTaskInfoOptional.isPresent()) { // todo(ml): use previous be id depend on change reason executeCommitTask(routineLoadTaskInfoOptional.get(), txnState); @@ -625,8 +627,8 @@ private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, Transact .add("msg", "commit task will be ignore when attachment txn of task is null," + " maybe task was committed by master when timeout") .build()); - } else { - // step1: update job progress + } else if (checkCommitInfo(rlTaskTxnCommitAttachment)) { + // step2: update job progress updateProgress(rlTaskTxnCommitAttachment); } @@ -637,6 +639,9 @@ private void executeCommitTask(RoutineLoadTaskInfo routineLoadTaskInfo, Transact } } + // check the correctness of commit info + abstract boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment); + protected static void unprotectCheckCreate(CreateRoutineLoadStmt stmt) throws AnalysisException { // check table belong to db, partitions belong to table if (stmt.getRoutineLoadDesc() == null) { @@ -762,19 +767,24 @@ public void update() { } // check if partition has been changed - if (needReschedule()) { - LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("msg", "Job need to be rescheduled") - .build()); - executeUpdate(); - updateState(JobState.NEED_SCHEDULE); + writeLock(); + try { + if (unprotectNeedReschedule()) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("msg", "Job need to be rescheduled") + .build()); + unprotectUpdateProgress(); + executeNeedSchedule(); + } + } finally { + writeUnlock(); } } - protected void executeUpdate() { + protected void unprotectUpdateProgress() { } - protected boolean needReschedule() { + protected boolean unprotectNeedReschedule() { return false; } } diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java index 4d0af1a5c535a2..c55bcabdfe94f6 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java @@ -23,6 +23,7 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; @@ -100,7 +101,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti // begin a txn for task RoutineLoadJob routineLoadJob = routineLoadManager.getJob(jobId); txnId = Catalog.getCurrentGlobalTransactionMgr().beginTransaction( - routineLoadJob.getDbId(), id.toString(), -1, "streamLoad", + routineLoadJob.getDbId(), DebugUtil.printId(id), -1, "streamLoad", TransactionState.LoadJobSourceType.ROUTINE_LOAD_TASK, routineLoadJob); } diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java new file mode 100644 index 00000000000000..0e3006b5c74997 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaProducerTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.doris.load.routineload; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +public class KafkaProducerTest { + + public Producer createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.74.167.16:8792"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "client1"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer<>(props); + } + + public static void main(String[] args) throws InterruptedException { + KafkaProducerTest kafkaProducerTest = new KafkaProducerTest(); + Producer kafkaProducer = kafkaProducerTest.createProducer(); + int i = 411; + while (true) { + String value = String.valueOf(i); +// if (i % 5 == 0) { +// value = value + "\t" + value; +// } else if (i % 6 == 0) { +// value = value + "\t" + value; +// } + ProducerRecord record = new ProducerRecord<>("miaoling", value); + try { + RecordMetadata metadata = kafkaProducer.send(record).get(); + System.out.println("Record send with value " + value + " to partition " + metadata.partition() + " with offset " + metadata.offset()); + } catch (ExecutionException e) { + System.out.println("Error in sending record " + value); + System.out.println(e); + } catch (InterruptedException e) { + System.out.println("Error in sending record " + value); + System.out.println(e); + } + i++; + Thread.sleep(500); + + } + } + +}