diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index ae092d5d6b62..9e38a97e1369 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -84,7 +84,7 @@ public IncrementalPublishingKafkaIndexTaskRunner( } @Override - protected Long getSequenceNumberToStoreAfterRead(@NotNull Long sequenceNumber) + protected Long getNextStartOffset(@NotNull Long sequenceNumber) { return sequenceNumber + 1; } @@ -209,17 +209,11 @@ protected void possiblyResetDataSourceMetadata( } @Override - protected boolean isEndSequenceOffsetsExclusive() + protected boolean isEndOffsetExclusive() { return true; } - @Override - protected boolean isStartingSequenceOffsetsExclusive() - { - return false; - } - @Override protected boolean isEndOfShard(Long seqNum) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index b4fa5ccdf6ee..911a64a8a04a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -706,18 +706,11 @@ protected void possiblyResetDataSourceMetadata( } @Override - protected boolean isEndSequenceOffsetsExclusive() + protected boolean isEndOffsetExclusive() { - return false; + return true; } - @Override - protected boolean isStartingSequenceOffsetsExclusive() - { - return false; - } - - @Override protected SeekableStreamPartitions deserializePartitionsFromMetadata( ObjectMapper mapper, @@ -805,7 +798,7 @@ private void requestPause() } @Override - protected Long getSequenceNumberToStoreAfterRead(Long sequenceNumber) + protected Long getNextStartOffset(Long sequenceNumber) { throw new UnsupportedOperationException(); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java index 50326f751ce1..247f6d785491 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java @@ -78,7 +78,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner streamPartition = StreamPartition.of(stream, shardId1); + recordSupplier.assign(ImmutableSet.of(streamPartition)); + expectLastCall(); + recordSupplier.seek(streamPartition, "2"); + expectLastCall(); expect(recordSupplier.poll(anyLong())).andReturn(records.subList(2, 4)) .once() .andReturn(Collections.emptyList()) @@ -2157,16 +2159,13 @@ public void testRestore() throws Exception verifyAll(); reset(recordSupplier); - recordSupplier.assign(anyObject()); - expectLastCall().anyTimes(); - - expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); - - recordSupplier.seek(anyObject(), anyString()); - expectLastCall().anyTimes(); - + recordSupplier.assign(ImmutableSet.of(streamPartition)); + expectLastCall(); + recordSupplier.seek(streamPartition, "3"); + expectLastCall(); expect(recordSupplier.poll(anyLong())).andReturn(records.subList(3, 6)).once(); - + recordSupplier.assign(ImmutableSet.of()); + expectLastCall(); recordSupplier.close(); expectLastCall(); @@ -2248,8 +2247,6 @@ public void testRestoreAfterPersistingSequences() throws Exception recordSupplier.assign(anyObject()); expectLastCall().anyTimes(); - expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); - recordSupplier.seek(anyObject(), anyString()); expectLastCall().anyTimes(); @@ -2321,9 +2318,6 @@ public void testRestoreAfterPersistingSequences() throws Exception recordSupplier.assign(anyObject()); expectLastCall().anyTimes(); - expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); - - recordSupplier.seek(anyObject(), anyString()); expectLastCall().anyTimes(); @@ -2377,7 +2371,7 @@ public void testRestoreAfterPersistingSequences() throws Exception Assert.assertEquals(5, task1.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway()); - Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(2, task2.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway()); @@ -2386,8 +2380,9 @@ public void testRestoreAfterPersistingSequences() throws Exception SegmentDescriptor desc2 = sd(task1, "2009/P1D", 0); SegmentDescriptor desc3 = sd(task1, "2010/P1D", 0); SegmentDescriptor desc4 = sd(task1, "2011/P1D", 0); - SegmentDescriptor desc5 = sd(task1, "2013/P1D", 0); - Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5), publishedDescriptors()); + SegmentDescriptor desc5 = sd(task1, "2012/P1D", 0); + SegmentDescriptor desc6 = sd(task1, "2013/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6), publishedDescriptors()); Assert.assertEquals( new KinesisDataSourceMetadata( new SeekableStreamPartitions<>(stream, ImmutableMap.of( @@ -2401,14 +2396,11 @@ public void testRestoreAfterPersistingSequences() throws Exception @Test(timeout = 120_000L) public void testRunWithPauseAndResume() throws Exception { - recordSupplier.assign(anyObject()); - expectLastCall().anyTimes(); - - expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); - - recordSupplier.seek(anyObject(), anyString()); - expectLastCall().anyTimes(); - + final StreamPartition streamPartition = StreamPartition.of(stream, shardId1); + recordSupplier.assign(ImmutableSet.of(streamPartition)); + expectLastCall(); + recordSupplier.seek(streamPartition, "2"); + expectLastCall(); expect(recordSupplier.poll(anyLong())).andReturn(records.subList(2, 5)) .once() .andReturn(Collections.emptyList()) @@ -2475,14 +2467,8 @@ public void testRunWithPauseAndResume() throws Exception reset(recordSupplier); - recordSupplier.assign(anyObject()); - expectLastCall().anyTimes(); - - expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); - - recordSupplier.seek(anyObject(), anyString()); - expectLastCall().anyTimes(); - + recordSupplier.assign(ImmutableSet.of()); + expectLastCall(); recordSupplier.close(); expectLastCall().once(); @@ -2546,8 +2532,8 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception final TreeMap> sequences = new TreeMap<>(); // Here the sequence number is 1 meaning that one incremental handoff was done by the failed task - // and this task should start reading from stream 2 for partition 0 - sequences.put(1, ImmutableMap.of(shardId1, "2")); + // and this task should start reading from offset 2 for partition 0 (not offset 1, because end is inclusive) + sequences.put(1, ImmutableMap.of(shardId1, "1")); final Map context = new HashMap<>(); context.put("checkpoints", objectMapper.writerWithType(new TypeReference>>() { @@ -2784,7 +2770,7 @@ private ListenableFuture runTask(final Task task) throw new ISE("Task is not ready"); } } - catch (Exception e) { + catch (Throwable e) { log.warn(e, "Task failed"); return TaskStatus.failure(task.getId(), Throwables.getStackTraceAsString(e)); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 4fc76406f715..ee9a7e2453e7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -142,6 +142,12 @@ public enum Status static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions"; private final Map endOffsets; + + // lastReadOffsets are the last offsets that were read and processed. + private final Map lastReadOffsets = new HashMap<>(); + + // currOffsets are what should become the start offsets of the next reader, if we stopped reading now. They are + // initialized to the start offsets when the task begins. private final ConcurrentMap currOffsets = new ConcurrentHashMap<>(); private final ConcurrentMap lastPersistedOffsets = new ConcurrentHashMap<>(); @@ -192,8 +198,6 @@ public enum Status private final Set publishingSequences = Sets.newConcurrentHashSet(); private final List> publishWaitList = new ArrayList<>(); private final List> handOffWaitList = new ArrayList<>(); - private final Set initialOffsetsSnapshot = new HashSet<>(); - private final Set exclusiveStartingPartitions = new HashSet<>(); private volatile DateTime startTime; private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread) @@ -272,7 +276,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception Map.Entry> previous = sequenceOffsets.next(); while (sequenceOffsets.hasNext()) { Map.Entry> current = sequenceOffsets.next(); - sequences.add(new SequenceMetadata<>( + addSequence(new SequenceMetadata<>( previous.getKey(), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), previous.getValue(), @@ -283,7 +287,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception previous = current; exclusive = true; } - sequences.add(new SequenceMetadata<>( + addSequence(new SequenceMetadata<>( previous.getKey(), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), previous.getValue(), @@ -292,7 +296,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception exclusive ? previous.getValue().keySet() : null )); } else { - sequences.add(new SequenceMetadata<>( + addSequence(new SequenceMetadata<>( 0, StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), ioConfig.getStartPartitions().getPartitionSequenceNumberMap(), @@ -408,6 +412,21 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception ); } + // Initialize lastReadOffsets immediately after restoring currOffsets. This is only done when end offsets are + // inclusive, because the point of initializing lastReadOffsets here is so we know when to skip the start record. + // When end offsets are exclusive, we never skip the start record. + if (!isEndOffsetExclusive()) { + for (Map.Entry entry : currOffsets.entrySet()) { + final boolean isAtStart = entry.getValue().equals( + ioConfig.getStartPartitions().getPartitionSequenceNumberMap().get(entry.getKey()) + ); + + if (!isAtStart || ioConfig.getExclusiveStartSequenceNumberPartitions().contains(entry.getKey())) { + lastReadOffsets.put(entry.getKey(), entry.getValue()); + } + } + } + // Set up committer. final Supplier committerSupplier = () -> { final Map snapshot = ImmutableMap.copyOf(currOffsets); @@ -450,17 +469,14 @@ public void run() status = Status.READING; Throwable caughtExceptionInner = null; - initialOffsetsSnapshot.addAll(currOffsets.keySet()); - exclusiveStartingPartitions.addAll(ioConfig.getExclusiveStartSequenceNumberPartitions()); - try { while (stillReading) { if (possiblyPause()) { // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign - // partitions upon resuming. This is safe even if the end sequences have not been modified. + // partitions upon resuming. Don't call "seekToStartingSequence" after "assignPartitions", because there's + // no need to re-seek here. All we're going to be doing is dropping partitions. assignment = assignPartitions(recordSupplier); possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment, currOffsets); - seekToStartingSequence(recordSupplier, assignment); if (assignment.isEmpty()) { log.info("All partitions have been fully read"); @@ -498,34 +514,17 @@ public void run() SequenceMetadata sequenceToCheckpoint = null; for (OrderedPartitionableRecord record : records) { - - - // for Kafka, the end offsets are exclusive, so skip it - if (isEndSequenceOffsetsExclusive() && - createSequenceNumber(record.getSequenceNumber()).compareTo( - createSequenceNumber(endOffsets.get(record.getPartitionId()))) >= 0) { - continue; - } - - // for the first message we receive, check that we were given a message with a sequenceNumber that matches - // our expected starting sequenceNumber - if (!verifyInitialRecordAndSkipExclusivePartition(record)) { - continue; - } + final boolean shouldProcess = verifyRecordInRange(record.getPartitionId(), record.getSequenceNumber()); log.trace( - "Got stream[%s] partition[%s] sequence[%s].", + "Got stream[%s] partition[%s] sequenceNumber[%s], shouldProcess[%s].", record.getStream(), record.getPartitionId(), - record.getSequenceNumber() + record.getSequenceNumber(), + shouldProcess ); - if (isEndOfShard(record.getSequenceNumber())) { - // shard is closed, applies to Kinesis only - currOffsets.put(record.getPartitionId(), record.getSequenceNumber()); - } else if (createSequenceNumber(record.getSequenceNumber()).compareTo( - createSequenceNumber(endOffsets.get(record.getPartitionId()))) <= 0) { - + if (shouldProcess) { try { final List valueBytess = record.getData(); final List rows; @@ -547,7 +546,7 @@ public void run() if (sequenceToUse == null) { throw new ISE( - "WTH?! cannot find any valid sequence for record with partition [%d] and sequence [%d]. Current sequences: %s", + "WTH?! cannot find any valid sequence for record with partition [%s] and sequenceNumber [%s]. Current sequences: %s", record.getPartitionId(), record.getSequenceNumber(), sequences @@ -617,12 +616,18 @@ public void onFailure(Throwable t) // in kafka, we can easily get the next offset by adding 1, but for kinesis, there's no way // to get the next sequence number without having to make an expensive api call. So the behavior // here for kafka is to +1 while for kinesis we simply save the current sequence number - currOffsets.put(record.getPartitionId(), getSequenceNumberToStoreAfterRead(record.getSequenceNumber())); + lastReadOffsets.put(record.getPartitionId(), record.getSequenceNumber()); + currOffsets.put(record.getPartitionId(), getNextStartOffset(record.getSequenceNumber())); } - if ((currOffsets.get(record.getPartitionId()).equals(endOffsets.get(record.getPartitionId())) - || isEndOfShard(currOffsets.get(record.getPartitionId()))) - && assignment.remove(record.getStreamPartition())) { + // Use record.getSequenceNumber() in the moreToRead check, since currOffsets might not have been + // updated if we were skipping records for being beyond the end. + final boolean moreToReadAfterThisRecord = isMoreToReadAfterReadingRecord( + record.getSequenceNumber(), + endOffsets.get(record.getPartitionId()) + ); + + if (!moreToReadAfterThisRecord && assignment.remove(record.getStreamPartition())) { log.info("Finished reading stream[%s], partition[%s].", record.getStream(), record.getPartitionId()); recordSupplier.assign(assignment); stillReading = !assignment.isEmpty(); @@ -688,11 +693,18 @@ public void onFailure(Throwable t) status = Status.PUBLISHING; } - for (SequenceMetadata sequenceMetadata : sequences) { + for (int i = 0; i < sequences.size(); i++) { + final SequenceMetadata sequenceMetadata = sequences.get(i); if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) { - // this is done to prevent checks in sequence specific commit supplier from failing - sequenceMetadata.setEndOffsets(currOffsets); - sequenceMetadata.updateAssignments(this, currOffsets); + final boolean isLast = i == (sequences.size() - 1); + if (isLast) { + // Shorten endOffsets of the last sequence to match currOffsets. + sequenceMetadata.setEndOffsets(currOffsets); + } + + // Update assignments of the sequence, which should clear them. (This will be checked later, when the + // Committer is built.) + sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadAfterReadingRecord); publishingSequences.add(sequenceMetadata.getSequenceName()); // persist already done in finally, so directly add to publishQueue publishAndRegisterHandoff(sequenceMetadata); @@ -795,7 +807,7 @@ public void onFailure(Throwable t) toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); toolbox.getDataSegmentServerAnnouncer().unannounce(); } - catch (Exception e) { + catch (Throwable e) { if (caughtExceptionOuter != null) { caughtExceptionOuter.addSuppressed(e); } else { @@ -912,7 +924,7 @@ public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata) @Override public void onFailure(Throwable t) { - log.error(t, "Error while publishing segments for sequence[%s]", sequenceMetadata); + log.error(t, "Error while publishing segments for sequenceNumber[%s]", sequenceMetadata); handoffFuture.setException(t); } } @@ -990,7 +1002,7 @@ private void maybePersistAndPublishSequences(Supplier committerSuppli throws InterruptedException { for (SequenceMetadata sequenceMetadata : sequences) { - sequenceMetadata.updateAssignments(this, currOffsets); + sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadBeforeReadingRecord); if (!sequenceMetadata.isOpen() && !publishingSequences.contains(sequenceMetadata.getSequenceName())) { publishingSequences.add(sequenceMetadata.getSequenceName()); try { @@ -1016,19 +1028,21 @@ private Set> assignPartitions( { final Set> assignment = new HashSet<>(); for (Map.Entry entry : currOffsets.entrySet()) { - final SequenceOffsetType endOffset = endOffsets.get(entry.getKey()); - if (isEndOfShard(endOffset) - || SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER.equals(endOffset) - || createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(endOffset)) < 0) { - assignment.add(StreamPartition.of(stream, entry.getKey())); - } else if (entry.getValue().equals(endOffset)) { - log.info("Finished reading partition[%s].", entry.getKey()); - } else { - throw new ISE( - "WTF?! Cannot start from sequence[%,d] > endOffset[%,d]", - entry.getValue(), + final PartitionIdType partition = entry.getKey(); + final SequenceOffsetType currOffset = entry.getValue(); + final SequenceOffsetType endOffset = endOffsets.get(partition); + + if (!isRecordAlreadyRead(partition, endOffset) && isMoreToReadBeforeReadingRecord(currOffset, endOffset)) { + log.info( + "Adding partition[%s], start[%s] -> end[%s] to assignment.", + partition, + currOffset, endOffset ); + + assignment.add(StreamPartition.of(stream, partition)); + } else { + log.info("Finished reading partition[%s].", partition); } } @@ -1037,6 +1051,77 @@ private Set> assignPartitions( return assignment; } + private void addSequence(final SequenceMetadata sequenceMetadata) + { + // Sanity check that the start of the new sequence matches up with the end of the prior sequence. + for (Map.Entry entry : sequenceMetadata.getStartOffsets().entrySet()) { + final PartitionIdType partition = entry.getKey(); + final SequenceOffsetType startOffset = entry.getValue(); + + if (!sequences.isEmpty()) { + final SequenceOffsetType priorOffset = sequences.get(sequences.size() - 1).endOffsets.get(partition); + + if (!startOffset.equals(priorOffset)) { + throw new ISE( + "New sequence startOffset[%s] does not equal expected prior offset[%s]", + startOffset, + priorOffset + ); + } + } + } + + // Actually do the add. + sequences.add(sequenceMetadata); + } + + /** + * Returns true if the given record has already been read, based on lastReadOffsets. + */ + private boolean isRecordAlreadyRead( + final PartitionIdType recordPartition, + final SequenceOffsetType recordSequenceNumber + ) + { + final SequenceOffsetType lastReadOffset = lastReadOffsets.get(recordPartition); + + if (lastReadOffset == null) { + return false; + } else { + return createSequenceNumber(recordSequenceNumber).compareTo(createSequenceNumber(lastReadOffset)) <= 0; + } + } + + /** + * Returns true if, given that we want to start reading from recordSequenceNumber and end at endSequenceNumber, there + * is more left to read. Used in pre-read checks to determine if there is anything left to read. + */ + private boolean isMoreToReadBeforeReadingRecord( + final SequenceOffsetType recordSequenceNumber, + final SequenceOffsetType endSequenceNumber + ) + { + final int compareToEnd = createSequenceNumber(recordSequenceNumber) + .compareTo(createSequenceNumber(endSequenceNumber)); + + return isEndOffsetExclusive() ? compareToEnd < 0 : compareToEnd <= 0; + } + + /** + * Returns true if, given that recordSequenceNumber has already been read and we want to end at endSequenceNumber, + * there is more left to read. Used in post-read checks to determine if there is anything left to read. + */ + private boolean isMoreToReadAfterReadingRecord( + final SequenceOffsetType recordSequenceNumber, + final SequenceOffsetType endSequenceNumber + ) + { + final int compareNextToEnd = createSequenceNumber(getNextStartOffset(recordSequenceNumber)) + .compareTo(createSequenceNumber(endSequenceNumber)); + + // Unlike isMoreToReadBeforeReadingRecord, we don't care if the end is exclusive or not. If we read it, we're done. + return compareNextToEnd < 0; + } private void seekToStartingSequence( RecordSupplier recordSupplier, @@ -1045,7 +1130,7 @@ private void seekToStartingSequence( { for (final StreamPartition partition : partitions) { final SequenceOffsetType sequence = currOffsets.get(partition.getPartitionId()); - log.info("Seeking partition[%s] to sequence[%s].", partition.getPartitionId(), sequence); + log.info("Seeking partition[%s] to sequenceNumber[%s].", partition.getPartitionId(), sequence); recordSupplier.seek(partition, sequence); } } @@ -1104,7 +1189,7 @@ private void handleParseException(ParseException pe, OrderedPartitionableRecord if (tuningConfig.isLogParseExceptions()) { log.error( pe, - "Encountered parse exception on row from partition[%s] sequence[%s]", + "Encountered parse exception on row from partition[%s] sequenceNumber[%s]", record.getPartitionId(), record.getSequenceNumber() ); @@ -1366,17 +1451,20 @@ public Response setEndOffsets( Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No Sequences found to set end sequences"); final SequenceMetadata latestSequence = sequences.get(sequences.size() - 1); - // if a partition has not been read yet (contained in initialOffsetsSnapshot), then - // do not mark the starting sequence number as exclusive - Set exclusivePartitions = sequenceNumbers.keySet() - .stream() - .filter(x -> !initialOffsetsSnapshot.contains(x) - || ioConfig.getExclusiveStartSequenceNumberPartitions() - .contains(x)) - .collect(Collectors.toSet()); + final Set exclusiveStartPartitions; + + if (isEndOffsetExclusive()) { + // When end offsets are exclusive, there's no need for marking the next sequence as having any + // exclusive-start partitions. It should always start from the end offsets of the prior sequence. + exclusiveStartPartitions = Collections.emptySet(); + } else { + // When end offsets are inclusive, we must mark all partitions as exclusive-start, to avoid reading + // their final messages (which have already been read). + exclusiveStartPartitions = sequenceNumbers.keySet(); + } if ((latestSequence.getStartOffsets().equals(sequenceNumbers) - && latestSequence.getExclusiveStartPartitions().equals(exclusivePartitions) + && latestSequence.getExclusiveStartPartitions().equals(exclusiveStartPartitions) && !finish) || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) { log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers); @@ -1416,19 +1504,17 @@ public Response setEndOffsets( log.info("Updating endOffsets from [%s] to [%s]", endOffsets, sequenceNumbers); endOffsets.putAll(sequenceNumbers); } else { - exclusiveStartingPartitions.addAll(exclusivePartitions); - // create new sequence + log.info("Creating new sequence with startOffsets [%s] and endOffsets [%s]", sequenceNumbers, endOffsets); final SequenceMetadata newSequence = new SequenceMetadata<>( latestSequence.getSequenceId() + 1, StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1), sequenceNumbers, endOffsets, false, - exclusivePartitions + exclusiveStartPartitions ); - sequences.add(newSequence); - initialOffsetsSnapshot.addAll(sequenceNumbers.keySet()); + addSequence(newSequence); } persistSequences(); } @@ -1582,45 +1668,47 @@ public DateTime getStartTime(@Context final HttpServletRequest req) return startTime; } - private boolean verifyInitialRecordAndSkipExclusivePartition( - final OrderedPartitionableRecord record + /** + * This method does two things: + * + * 1) Verifies that the sequence numbers we read are at least as high as those read previously, and throws an + * exception if not. + * 2) Returns false if we should skip this record because it's either (a) the first record in a partition that we are + * needing to be exclusive on; (b) too late to read, past the endOffsets. + */ + private boolean verifyRecordInRange( + final PartitionIdType partition, + final SequenceOffsetType recordOffset ) { - // Check only for the first record among the record batch. - if (initialOffsetsSnapshot.contains(record.getPartitionId())) { - final SequenceOffsetType currOffset = Preconditions.checkNotNull( - currOffsets.get(record.getPartitionId()), - "Current offset is null for sequenceNumber[%s] and partitionId[%s]", - record.getSequenceNumber(), - record.getPartitionId() - ); - final OrderedSequenceNumber recordSequenceNumber = createSequenceNumber( - record.getSequenceNumber() - ); - final OrderedSequenceNumber currentSequenceNumber = createSequenceNumber( - currOffset - ); - if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) { - throw new ISE( - "sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition[%s]", - record.getSequenceNumber(), - currOffset, - record.getPartitionId() - ); - } + // Verify that the record is at least as high as its currOffset. + final SequenceOffsetType currOffset = Preconditions.checkNotNull( + currOffsets.get(partition), + "Current offset is null for sequenceNumber[%s] and partition[%s]", + recordOffset, + partition + ); - // Remove the mark to notify that this partition has been read. - initialOffsetsSnapshot.remove(record.getPartitionId()); + final OrderedSequenceNumber recordSequenceNumber = createSequenceNumber(recordOffset); + final OrderedSequenceNumber currentSequenceNumber = createSequenceNumber(currOffset); - // check exclusive starting sequence - if (isStartingSequenceOffsetsExclusive() && exclusiveStartingPartitions.contains(record.getPartitionId())) { - log.info("Skipping starting sequenceNumber for partition[%s] marked exclusive", record.getPartitionId()); + final int comparisonToCurrent = recordSequenceNumber.compareTo(currentSequenceNumber); + if (comparisonToCurrent < 0) { + throw new ISE( + "Record sequenceNumber[%s] is smaller than current sequenceNumber[%s] for partition[%s]", + recordOffset, + currOffset, + partition + ); + } - return false; - } + // Check if the record has already been read. + if (isRecordAlreadyRead(partition, recordOffset)) { + return false; } - return true; + // Finally, check if this record comes before the endOffsets for this partition. + return isMoreToReadBeforeReadingRecord(recordSequenceNumber.get(), endOffsets.get(partition)); } /** @@ -1645,16 +1733,14 @@ protected abstract TreeMap> ge ) throws IOException; /** - * Calculates the sequence number used to update `currentOffsets` after finished reading a record. - * In Kafka this returns sequenceNumeber + 1 since that's the next expected offset - * In Kinesis this simply returns sequenceNumber, since the sequence numbers in Kinesis are not - * contiguous and finding the next sequence number requires an expensive API call + * Calculates the sequence number used to update currOffsets after finished reading a record. + * This is what would become the start offsets of the next reader, if we stopped reading now. * * @param sequenceNumber the sequence number that has already been processed * * @return next sequence number to be stored */ - protected abstract SequenceOffsetType getSequenceNumberToStoreAfterRead(SequenceOffsetType sequenceNumber); + protected abstract SequenceOffsetType getNextStartOffset(SequenceOffsetType sequenceNumber); /** * deserializes stored metadata into SeekableStreamPartitions @@ -1726,14 +1812,7 @@ protected abstract void possiblyResetDataSourceMetadata( * In Kafka, the endOffsets are exclusive, so skip it. * In Kinesis the endOffsets are inclusive */ - protected abstract boolean isEndSequenceOffsetsExclusive(); - - /** - * In Kafka, the startingOffsets are inclusive. - * In Kinesis, the startingOffsets are exclusive, except for the first - * partition we read from stream - */ - protected abstract boolean isStartingSequenceOffsetsExclusive(); + protected abstract boolean isEndOffsetExclusive(); protected abstract TypeReference>> getSequenceMetadataTypeReference(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java index 28f5dde79cb2..dc3ff87aff15 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java @@ -143,9 +143,9 @@ public int hashCode() @Override public String toString() { - return "SeekableStreamPartitions{" + - "stream/topic='" + stream + '\'' + - ", partitionSequenceNumberMap/partitionOffsetMap=" + partitionIdToSequenceNumberMap + + return getClass().getSimpleName() + "{" + + "stream='" + stream + '\'' + + ", partitionSequenceNumberMap=" + partitionIdToSequenceNumberMap + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index 7fbc800ac3d1..61bb35a7778a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiFunction; public class SequenceMetadata { @@ -148,17 +149,16 @@ void setEndOffsets(Map newEndOffsets) } void updateAssignments( - SeekableStreamIndexTaskRunner runner, - Map nextPartitionOffset + Map currOffsets, + BiFunction moreToReadFn ) { lock.lock(); try { assignments.clear(); - nextPartitionOffset.forEach((key, value) -> { + currOffsets.forEach((key, value) -> { SequenceOffsetType endOffset = endOffsets.get(key); - if (SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER.equals(endOffset) - || runner.createSequenceNumber(endOffset).compareTo(runner.createSequenceNumber(nextPartitionOffset.get(key))) > 0) { + if (moreToReadFn.apply(value, endOffset)) { assignments.add(key); } }); @@ -188,14 +188,15 @@ boolean canHandle( return false; } boolean ret; - if (runner.isStartingSequenceOffsetsExclusive()) { + if (!runner.isEndOffsetExclusive()) { + // Inclusive endOffsets mean that we must skip the first record of any partition that has been read before. ret = recordOffset.compareTo(partitionStartOffset) >= (getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0); } else { ret = recordOffset.compareTo(partitionStartOffset) >= 0; } - if (runner.isEndSequenceOffsetsExclusive()) { + if (runner.isEndOffsetExclusive()) { ret &= recordOffset.compareTo(partitionEndOffset) < 0; } else { ret &= recordOffset.compareTo(partitionEndOffset) <= 0; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java index f193488240d9..74fd08d445ec 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.seekablestream.common; +import java.util.Objects; /** * Represents a Kafka/Kinesis stream sequence number. Mainly used to do @@ -51,4 +52,33 @@ public boolean isExclusive() { return isExclusive; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OrderedSequenceNumber that = (OrderedSequenceNumber) o; + return isExclusive == that.isExclusive && + Objects.equals(sequenceNumber, that.sequenceNumber); + } + + @Override + public int hashCode() + { + return Objects.hash(sequenceNumber, isExclusive); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "sequenceNumber=" + sequenceNumber + + ", isExclusive=" + isExclusive + + '}'; + } }