diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 7eee9dcf259e..367b20173285 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1887,22 +1887,25 @@ private boolean verifyInitialRecordAndSkipExclusivePartition( { // Check only for the first record among the record batch. if (initialOffsetsSnapshot.contains(record.getPartitionId())) { - final SequenceOffsetType currOffset = currOffsets.get(record.getPartitionId()); - if (currOffset != null) { - final OrderedSequenceNumber recordSequenceNumber = createSequenceNumber( - record.getSequenceNumber() - ); - final OrderedSequenceNumber currentSequenceNumber = createSequenceNumber( - currOffset + final SequenceOffsetType currOffset = Preconditions.checkNotNull( + currOffsets.get(record.getPartitionId()), + "Current offset is null for sequenceNumber[%s] and partitionId[%s]", + record.getSequenceNumber(), + record.getPartitionId() + ); + final OrderedSequenceNumber recordSequenceNumber = createSequenceNumber( + record.getSequenceNumber() + ); + final OrderedSequenceNumber currentSequenceNumber = createSequenceNumber( + currOffset + ); + if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) { + throw new ISE( + "sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition[%s]", + record.getSequenceNumber(), + currOffset, + record.getPartitionId() ); - if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) { - throw new ISE( - "sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition [%s]", - record.getSequenceNumber(), - currOffset, - record.getPartitionId() - ); - } } // Remove the mark to notify that this partition has been read. @@ -1910,7 +1913,7 @@ private boolean verifyInitialRecordAndSkipExclusivePartition( // check exclusive starting sequence if (isStartingSequenceOffsetsExclusive() && exclusiveStartingPartitions.contains(record.getPartitionId())) { - log.warn("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId()); + log.info("Skipping starting sequenceNumber for partition[%s] marked exclusive", record.getPartitionId()); return false; }