diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 1952ccaf33252d..0619188ea98ea9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -17,6 +17,7 @@ package org.apache.doris.load.routineload; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.common.util.DebugUtil; @@ -36,6 +37,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; /** * this is description of kafka routine load progress @@ -58,6 +60,8 @@ public class KafkaProgress extends RoutineLoadProgress { @SerializedName(value = "pito") private ConcurrentMap partitionIdToOffset = Maps.newConcurrentMap(); + private ReentrantLock lock = new ReentrantLock(true); + public KafkaProgress() { super(LoadDataSourceType.KAFKA); } @@ -208,9 +212,24 @@ public String toJsonString() { @Override public void update(RLTaskTxnCommitAttachment attachment) { KafkaProgress newProgress = (KafkaProgress) attachment.getProgress(); + // + 1 to point to the next msg offset to be consumed - newProgress.partitionIdToOffset.entrySet().stream() - .forEach(entity -> this.partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); + if (Config.isCloudMode()) { + lock.lock(); + try { + newProgress.partitionIdToOffset.forEach((partitionId, newOffset) -> { + this.partitionIdToOffset.compute(partitionId, (key, oldOffset) -> { + return (oldOffset == null || newOffset + 1 > oldOffset) ? newOffset + 1 : oldOffset; + }); + }); + } finally { + lock.unlock(); + } + } else { + newProgress.partitionIdToOffset.entrySet().stream() + .forEach(entity -> this.partitionIdToOffset.put(entity.getKey(), entity.getValue() + 1)); + } + if (LOG.isDebugEnabled()) { LOG.debug("update kafka progress: {}, task: {}, job: {}", newProgress.toJsonString(), DebugUtil.printId(attachment.getTaskId()), attachment.getJobId());