From 7bade85b4c45709d1f0dd8cf384a5215565c8e44 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 14 Mar 2019 09:33:03 -0400 Subject: [PATCH 1/2] Logic adjustments to SeekableStreamIndexTaskRunner. A mix of simplifications and bug fixes. They are intermingled because some of the bugs were made difficult to fix, and also more likely to happen in the first place, by how the code was structured. I tried to keep restructuring to a minimum. The changes are: - Remove "initialOffsetsSnapshot", which was used to determine when to skip start offsets. Replace it with "lastReadOffsets", which I hope is more intuitive. (There is a connection: start offsets must be skipped if and only if they have already been read, either by a previous task or by a previous sequence in the same task, post-restoring.) - Remove "isStartingSequenceOffsetsExclusive", because it should always be the opposite of isEndOffsetExclusive. The reason is that starts are exclusive exactly when the prior ends are inclusive: they must match up in that way for adjacent reads to link up properly. - Don't call "seekToStartingSequence" after the initial seek. There is no reason to, since we expect to read continuous message streams throughout the task. And calling it makes offset-tracking logic trickier, so better to avoid the need for trickiness. I believe the call being here was causing a bug in Kinesis ingestion where a message might get double-read. - Remove the "continue" calls in the main read loop. They are bad because they prevent keeping currOffsets and lastReadOffsets up to date, and prevent us from detecting that we have finished reading. - Rework "verifyInitialRecordAndSkipExclusivePartition" into "verifyRecordInRange". It no longer has side effects. It does a sanity check on the message offset and also makes sure that it is not past the endOffsets. - Rework "assignPartitions" to replace inline comparisons with "isRecordAlreadyRead" and "isMoreToReadBeforeReadingRecord" calls. I believe this fixes an off-by-one error with Kinesis where the last record would not get read. It also makes the logic easier to read. - When doing the final publish, only adjust end offsets of the final sequence, rather than potentially adjusting any unpublished sequence. Adjusting sequences other than the last one is a mistake since it will extend their endOffsets beyond what they actually read. (I'm not sure if this was an issue in practice, since I'm not sure if real world situations would have more than one unpublished sequence.) - Rename "isEndSequenceOffsetsExclusive" to "isEndOffsetExclusive". It's shorter and more clear, I think. - Add equals/hashCode/toString methods to OrderedSequenceNumber. Kafka test changes: - Added a Kafka "testRestoreAtEndOffset" test to verify that restores at the very end of the task lifecycle still work properly. Kinesis test changes: - Renamed "testRunOnNothing" to "testRunOnSingletonRange". I think that given Kinesis semantics, the right behavior when start offset equals end offset (and there aren't exclusive partitions set) is to read that single offset. This is because they are both meant to be treated as inclusive. - Adjusted "testRestoreAfterPersistingSequences" to expect one more message read. I believe the old test was wrong; it expected the task not to read message number 5. - Adjusted "testRunContextSequenceAheadOfStartingOffsets" to use a checkpoint starting from 1 rather than 2. I believe the old test was wrong here too; it was expecting the task to start reading from the checkpointed offset, but it actually should have started reading from one past the checkpointed offset. - Adjusted "testIncrementalHandOffReadsThroughEndOffsets" to expect 11 messages read instead of 12. It's starting at message 0 and reading up to 10, which should be 11 messages. --- ...ementalPublishingKafkaIndexTaskRunner.java | 10 +- .../kafka/LegacyKafkaIndexTaskRunner.java | 13 +- .../indexing/kafka/KafkaIndexTaskTest.java | 110 +++++++ .../kinesis/KinesisIndexTaskRunner.java | 10 +- .../kinesis/KinesisIndexTaskTest.java | 87 +++--- .../SeekableStreamIndexTaskRunner.java | 278 +++++++++++------- .../seekablestream/SequenceMetadata.java | 15 +- .../common/OrderedSequenceNumber.java | 30 ++ 8 files changed, 361 insertions(+), 192 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index ae092d5d6b62..9e38a97e1369 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -84,7 +84,7 @@ public IncrementalPublishingKafkaIndexTaskRunner( } @Override - protected Long getSequenceNumberToStoreAfterRead(@NotNull Long sequenceNumber) + protected Long getNextStartOffset(@NotNull Long sequenceNumber) { return sequenceNumber + 1; } @@ -209,17 +209,11 @@ protected void possiblyResetDataSourceMetadata( } @Override - protected boolean isEndSequenceOffsetsExclusive() + protected boolean isEndOffsetExclusive() { return true; } - @Override - protected boolean isStartingSequenceOffsetsExclusive() - { - return false; - } - @Override protected boolean isEndOfShard(Long seqNum) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index 1082473a37dd..4104bf13f548 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -706,18 +706,11 @@ protected void possiblyResetDataSourceMetadata( } @Override - protected boolean isEndSequenceOffsetsExclusive() + protected boolean isEndOffsetExclusive() { - return false; + return true; } - @Override - protected boolean isStartingSequenceOffsetsExclusive() - { - return false; - } - - @Override protected SeekableStreamPartitions deserializePartitionsFromMetadata( ObjectMapper mapper, @@ -805,7 +798,7 @@ private void requestPause() } @Override - protected Long getSequenceNumberToStoreAfterRead(Long sequenceNumber) + protected Long getNextStartOffset(Long sequenceNumber) { throw new UnsupportedOperationException(); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 461ac9e3a896..98c148de66a7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -313,6 +313,116 @@ private static List> generateSinglePartitionRecor ); } + @Test(timeout = 60_000L) + public void testRestoreAtEndOffset() throws Exception + { + if (!isIncrementalHandoffSupported) { + return; + } + + records = generateSinglePartitionRecords(topic); + maxRowsPerSegment = 2; + Map consumerProps = kafkaServer.consumerProperties(); + consumerProps.put("max.poll.records", "1"); + + final KafkaIndexTask task1 = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)), + consumerProps, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + + final SeekableStreamPartitions checkpoint = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of(0, 5L) + ); + + final ListenableFuture future1 = runTask(task1); + + // Insert some data, but not enough for the task to finish + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.limit(records, 5)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + + while (task1.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + final Map currentOffsets = ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets()); + Assert.assertEquals(checkpoint.getPartitionSequenceNumberMap(), currentOffsets); + // Set endOffsets to persist sequences + task1.getRunner().setEndOffsets(ImmutableMap.of(0, 5L), false); + + // Stop without publishing segment + task1.stopGracefully(toolboxFactory.build(task1).getConfig()); + unlockAppenderatorBasePersistDirForTask(task1); + + Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode()); + + // Start a new task + final KafkaIndexTask task2 = createTask( + task1.getId(), + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)), + consumerProps, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + + final ListenableFuture future2 = runTask(task2); + // Wait for the task to start reading + + // Insert remaining data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.skip(records, 5)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(5, task1.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway()); + + // Check published segments & metadata + SegmentDescriptor desc1 = sd(task1, "2008/P1D", 0); + SegmentDescriptor desc2 = sd(task1, "2009/P1D", 0); + SegmentDescriptor desc3 = sd(task1, "2010/P1D", 0); + SegmentDescriptor desc4 = sd(task1, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + } + private static String getTopicName() { return "topic" + topicPostfix++; diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java index 50326f751ce1..247f6d785491 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java @@ -78,7 +78,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner streamPartition = StreamPartition.of(stream, shardId1); + recordSupplier.assign(ImmutableSet.of(streamPartition)); + expectLastCall(); + expect(recordSupplier.getEarliestSequenceNumber(streamPartition)).andReturn("0"); + recordSupplier.seek(streamPartition, "2"); + expectLastCall(); expect(recordSupplier.poll(anyLong())).andReturn(records.subList(2, 4)) .once() .andReturn(Collections.emptyList()) @@ -2158,16 +2161,14 @@ public void testRestore() throws Exception verifyAll(); reset(recordSupplier); - recordSupplier.assign(anyObject()); - expectLastCall().anyTimes(); - - expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); - - recordSupplier.seek(anyObject(), anyString()); - expectLastCall().anyTimes(); - + recordSupplier.assign(ImmutableSet.of(streamPartition)); + expectLastCall(); + expect(recordSupplier.getEarliestSequenceNumber(streamPartition)).andReturn("0"); + recordSupplier.seek(streamPartition, "3"); + expectLastCall(); expect(recordSupplier.poll(anyLong())).andReturn(records.subList(3, 6)).once(); - + recordSupplier.assign(ImmutableSet.of()); + expectLastCall(); recordSupplier.close(); expectLastCall(); @@ -2378,7 +2379,7 @@ public void testRestoreAfterPersistingSequences() throws Exception Assert.assertEquals(5, task1.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway()); - Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(2, task2.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway()); @@ -2387,8 +2388,9 @@ public void testRestoreAfterPersistingSequences() throws Exception SegmentDescriptor desc2 = sd(task1, "2009/P1D", 0); SegmentDescriptor desc3 = sd(task1, "2010/P1D", 0); SegmentDescriptor desc4 = sd(task1, "2011/P1D", 0); - SegmentDescriptor desc5 = sd(task1, "2013/P1D", 0); - Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5), publishedDescriptors()); + SegmentDescriptor desc5 = sd(task1, "2012/P1D", 0); + SegmentDescriptor desc6 = sd(task1, "2013/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6), publishedDescriptors()); Assert.assertEquals( new KinesisDataSourceMetadata( new SeekableStreamPartitions<>(stream, ImmutableMap.of( @@ -2402,14 +2404,12 @@ public void testRestoreAfterPersistingSequences() throws Exception @Test(timeout = 120_000L) public void testRunWithPauseAndResume() throws Exception { - recordSupplier.assign(anyObject()); - expectLastCall().anyTimes(); - - expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); - - recordSupplier.seek(anyObject(), anyString()); - expectLastCall().anyTimes(); - + final StreamPartition streamPartition = StreamPartition.of(stream, shardId1); + recordSupplier.assign(ImmutableSet.of(streamPartition)); + expectLastCall(); + expect(recordSupplier.getEarliestSequenceNumber(streamPartition)).andReturn("0"); + recordSupplier.seek(streamPartition, "2"); + expectLastCall(); expect(recordSupplier.poll(anyLong())).andReturn(records.subList(2, 5)) .once() .andReturn(Collections.emptyList()) @@ -2476,14 +2476,8 @@ public void testRunWithPauseAndResume() throws Exception reset(recordSupplier); - recordSupplier.assign(anyObject()); - expectLastCall().anyTimes(); - - expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); - - recordSupplier.seek(anyObject(), anyString()); - expectLastCall().anyTimes(); - + recordSupplier.assign(ImmutableSet.of()); + expectLastCall(); recordSupplier.close(); expectLastCall().once(); @@ -2547,8 +2541,8 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception final TreeMap> sequences = new TreeMap<>(); // Here the sequence number is 1 meaning that one incremental handoff was done by the failed task - // and this task should start reading from stream 2 for partition 0 - sequences.put(1, ImmutableMap.of(shardId1, "2")); + // and this task should start reading from offset 2 for partition 0 (not offset 1, because end is inclusive) + sequences.put(1, ImmutableMap.of(shardId1, "1")); final Map context = new HashMap<>(); context.put("checkpoints", objectMapper.writerWithType(new TypeReference>>() { @@ -2696,17 +2690,14 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets); task.getRunner().setEndOffsets(currentOffsets, false); - // The task is supposed to consume remaining rows up to the offset of 13 while (task.getRunner().getStatus() != Status.PAUSED) { Thread.sleep(10); } currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets); - task.getRunner().setEndOffsets( - ImmutableMap.of(shardId1, String.valueOf(Long.valueOf(task.getRunner().getCurrentOffsets().get(shardId1)) + 1)), - true - ); + // Set end offsets to one past the checkpoint, simulating a replica that needs to catch up. + task.getRunner().setEndOffsets(ImmutableMap.of(shardId1, "10"), true); Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -2715,7 +2706,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception Assert.assertEquals(2, checkpointRequestsHash.size()); // Check metrics - Assert.assertEquals(12, task.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(11, task.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); @@ -2762,7 +2753,7 @@ private ListenableFuture runTask(final Task task) throw new ISE("Task is not ready"); } } - catch (Exception e) { + catch (Throwable e) { log.warn(e, "Task failed"); return TaskStatus.failure(task.getId(), Throwables.getStackTraceAsString(e)); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 4fca19d12174..7e697a0a1078 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -142,6 +142,12 @@ public enum Status static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions"; private final Map endOffsets; + + // lastReadOffsets are the last offsets that were read and processed. + private final ConcurrentMap lastReadOffsets = new ConcurrentHashMap<>(); + + // currOffsets are what should become the start offsets of the next reader, if we stopped reading now. They are + // initialized to the start offsets when the task begins. private final ConcurrentMap currOffsets = new ConcurrentHashMap<>(); private final ConcurrentMap lastPersistedOffsets = new ConcurrentHashMap<>(); @@ -192,7 +198,6 @@ public enum Status private final Set publishingSequences = Sets.newConcurrentHashSet(); private final List> publishWaitList = new ArrayList<>(); private final List> handOffWaitList = new ArrayList<>(); - private final Set initialOffsetsSnapshot = new HashSet<>(); private final Set exclusiveStartingPartitions = new HashSet<>(); private volatile DateTime startTime; @@ -375,7 +380,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception // Sanity checks. if (!restoredNextPartitions.getStream().equals(ioConfig.getStartPartitions().getStream())) { throw new ISE( - "WTF?! Restored stream[%s] but expected stream[%s]", + "WTF?! Restored topic[%s] but expected topic[%s]", restoredNextPartitions.getStream(), ioConfig.getStartPartitions().getStream() ); @@ -408,6 +413,21 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception ); } + // Initialize lastReadOffsets immediately after restoring currOffsets. This is only done when end offsets are + // inclusive, because the point of initializing lastReadOffsets here is so we know when to skip the start record. + // When end offsets are exclusive, we never skip the start record. + if (!isEndOffsetExclusive()) { + for (Map.Entry entry : currOffsets.entrySet()) { + final boolean isAtStart = entry.getValue().equals( + ioConfig.getStartPartitions().getPartitionSequenceNumberMap().get(entry.getKey()) + ); + + if (!isAtStart || ioConfig.getExclusiveStartSequenceNumberPartitions().contains(entry.getKey())) { + lastReadOffsets.put(entry.getKey(), entry.getValue()); + } + } + } + // Set up committer. final Supplier committerSupplier = () -> { final Map snapshot = ImmutableMap.copyOf(currOffsets); @@ -450,17 +470,14 @@ public void run() status = Status.READING; Throwable caughtExceptionInner = null; - initialOffsetsSnapshot.addAll(currOffsets.keySet()); - exclusiveStartingPartitions.addAll(ioConfig.getExclusiveStartSequenceNumberPartitions()); - try { while (stillReading) { if (possiblyPause()) { // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign - // partitions upon resuming. This is safe even if the end sequences have not been modified. + // partitions upon resuming. Don't call "seekToStartingSequence" after "assignPartitions", because there's + // no need to re-seek here. All we're going to be doing is dropping partitions. assignment = assignPartitions(recordSupplier); possiblyResetDataSourceMetadata(toolbox, recordSupplier, assignment, currOffsets); - seekToStartingSequence(recordSupplier, assignment); if (assignment.isEmpty()) { log.info("All partitions have been fully read"); @@ -498,34 +515,17 @@ public void run() SequenceMetadata sequenceToCheckpoint = null; for (OrderedPartitionableRecord record : records) { - - - // for Kafka, the end offsets are exclusive, so skip it - if (isEndSequenceOffsetsExclusive() && - createSequenceNumber(record.getSequenceNumber()).compareTo( - createSequenceNumber(endOffsets.get(record.getPartitionId()))) >= 0) { - continue; - } - - // for the first message we receive, check that we were given a message with a sequenceNumber that matches - // our expected starting sequenceNumber - if (!verifyInitialRecordAndSkipExclusivePartition(record)) { - continue; - } + final boolean shouldProcess = verifyRecordInRange(record.getPartitionId(), record.getSequenceNumber()); log.trace( - "Got stream[%s] partition[%s] sequence[%s].", + "Got topic[%s] partition[%s] offset[%s], shouldProcess[%s].", record.getStream(), record.getPartitionId(), - record.getSequenceNumber() + record.getSequenceNumber(), + shouldProcess ); - if (isEndOfShard(record.getSequenceNumber())) { - // shard is closed, applies to Kinesis only - currOffsets.put(record.getPartitionId(), record.getSequenceNumber()); - } else if (createSequenceNumber(record.getSequenceNumber()).compareTo( - createSequenceNumber(endOffsets.get(record.getPartitionId()))) <= 0) { - + if (shouldProcess) { try { final List valueBytess = record.getData(); final List rows; @@ -547,7 +547,7 @@ public void run() if (sequenceToUse == null) { throw new ISE( - "WTH?! cannot find any valid sequence for record with partition [%d] and sequence [%d]. Current sequences: %s", + "WTH?! cannot find any valid sequence for record with partition [%s] and offset [%s]. Current sequences: %s", record.getPartitionId(), record.getSequenceNumber(), sequences @@ -617,13 +617,19 @@ public void onFailure(Throwable t) // in kafka, we can easily get the next offset by adding 1, but for kinesis, there's no way // to get the next sequence number without having to make an expensive api call. So the behavior // here for kafka is to +1 while for kinesis we simply save the current sequence number - currOffsets.put(record.getPartitionId(), getSequenceNumberToStoreAfterRead(record.getSequenceNumber())); + lastReadOffsets.put(record.getPartitionId(), record.getSequenceNumber()); + currOffsets.put(record.getPartitionId(), getNextStartOffset(record.getSequenceNumber())); } - if ((currOffsets.get(record.getPartitionId()).equals(endOffsets.get(record.getPartitionId())) - || isEndOfShard(currOffsets.get(record.getPartitionId()))) - && assignment.remove(record.getStreamPartition())) { - log.info("Finished reading stream[%s], partition[%s].", record.getStream(), record.getPartitionId()); + // Use record.getSequenceNumber() in the moreToRead check, since currOffsets might not have been + // updated if we were skipping records for being beyond the end. + final boolean moreToReadAfterThisRecord = isMoreToReadAfterReadingRecord( + record.getSequenceNumber(), + endOffsets.get(record.getPartitionId()) + ); + + if (!moreToReadAfterThisRecord && assignment.remove(record.getStreamPartition())) { + log.info("Finished reading topic[%s], partition[%s].", record.getStream(), record.getPartitionId()); recordSupplier.assign(assignment); stillReading = !assignment.isEmpty(); } @@ -688,11 +694,18 @@ public void onFailure(Throwable t) status = Status.PUBLISHING; } - for (SequenceMetadata sequenceMetadata : sequences) { + for (int i = 0; i < sequences.size(); i++) { + final SequenceMetadata sequenceMetadata = sequences.get(i); if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) { - // this is done to prevent checks in sequence specific commit supplier from failing - sequenceMetadata.setEndOffsets(currOffsets); - sequenceMetadata.updateAssignments(this, currOffsets); + final boolean isLast = i == (sequences.size() - 1); + if (isLast) { + // Shorten endOffsets of the last sequence to match currOffsets. + sequenceMetadata.setEndOffsets(currOffsets); + } + + // Update assignments of the sequence, which should clear them. (This will be checked later, when the + // Committer is built.) + sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadAfterReadingRecord); publishingSequences.add(sequenceMetadata.getSequenceName()); // persist already done in finally, so directly add to publishQueue publishAndRegisterHandoff(sequenceMetadata); @@ -795,7 +808,7 @@ public void onFailure(Throwable t) toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); toolbox.getDataSegmentServerAnnouncer().unannounce(); } - catch (Exception e) { + catch (Throwable e) { if (caughtExceptionOuter != null) { caughtExceptionOuter.addSuppressed(e); } else { @@ -990,7 +1003,7 @@ private void maybePersistAndPublishSequences(Supplier committerSuppli throws InterruptedException { for (SequenceMetadata sequenceMetadata : sequences) { - sequenceMetadata.updateAssignments(this, currOffsets); + sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadBeforeReadingRecord); if (!sequenceMetadata.isOpen() && !publishingSequences.contains(sequenceMetadata.getSequenceName())) { publishingSequences.add(sequenceMetadata.getSequenceName()); try { @@ -1016,19 +1029,21 @@ private Set> assignPartitions( { final Set> assignment = new HashSet<>(); for (Map.Entry entry : currOffsets.entrySet()) { - final SequenceOffsetType endOffset = endOffsets.get(entry.getKey()); - if (isEndOfShard(endOffset) - || SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER.equals(endOffset) - || createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(endOffset)) < 0) { - assignment.add(StreamPartition.of(stream, entry.getKey())); - } else if (entry.getValue().equals(endOffset)) { - log.info("Finished reading partition[%s].", entry.getKey()); - } else { - throw new ISE( - "WTF?! Cannot start from sequence[%,d] > endOffset[%,d]", - entry.getValue(), + final PartitionIdType partition = entry.getKey(); + final SequenceOffsetType currOffset = entry.getValue(); + final SequenceOffsetType endOffset = endOffsets.get(partition); + + if (!isRecordAlreadyRead(partition, endOffset) && isMoreToReadBeforeReadingRecord(currOffset, endOffset)) { + log.info( + "Adding partition[%s], start[%s] -> end[%s] to assignment.", + partition, + currOffset, endOffset ); + + assignment.add(StreamPartition.of(stream, partition)); + } else { + log.info("Finished reading partition[%s].", partition); } } @@ -1037,6 +1052,53 @@ private Set> assignPartitions( return assignment; } + /** + * Returns true if the given record has already been read, based on lastReadOffsets. + */ + private boolean isRecordAlreadyRead( + final PartitionIdType recordPartition, + final SequenceOffsetType recordSequenceNumber + ) + { + final SequenceOffsetType lastReadOffset = lastReadOffsets.get(recordPartition); + + if (lastReadOffset == null) { + return false; + } else { + return createSequenceNumber(recordSequenceNumber).compareTo(createSequenceNumber(lastReadOffset)) <= 0; + } + } + + /** + * Returns true if, given that we want to start reading from recordSequenceNumber and end at endSequenceNumber, there + * is more left to read. Used in pre-read checks to determine if there is anything left to read. + */ + private boolean isMoreToReadBeforeReadingRecord( + final SequenceOffsetType recordSequenceNumber, + final SequenceOffsetType endSequenceNumber + ) + { + final int compareToEnd = createSequenceNumber(recordSequenceNumber) + .compareTo(createSequenceNumber(endSequenceNumber)); + + return isEndOffsetExclusive() ? compareToEnd < 0 : compareToEnd <= 0; + } + + /** + * Returns true if, given that recordSequenceNumber has already been read and we want to end at endSequenceNumber, + * there is more left to read. Used in post-read checks to determine if there is anything left to read. + */ + private boolean isMoreToReadAfterReadingRecord( + final SequenceOffsetType recordSequenceNumber, + final SequenceOffsetType endSequenceNumber + ) + { + final int compareNextToEnd = createSequenceNumber(getNextStartOffset(recordSequenceNumber)) + .compareTo(createSequenceNumber(endSequenceNumber)); + + // Unlike isMoreToReadBeforeReadingRecord, we don't care if the end is exclusive or not. If we read it, we're done. + return compareNextToEnd < 0; + } private void seekToStartingSequence( RecordSupplier recordSupplier, @@ -1045,7 +1107,7 @@ private void seekToStartingSequence( { for (final StreamPartition partition : partitions) { final SequenceOffsetType sequence = currOffsets.get(partition.getPartitionId()); - log.info("Seeking partition[%s] to sequence[%s].", partition.getPartitionId(), sequence); + log.info("Seeking partition[%s] to offset[%s].", partition.getPartitionId(), sequence); recordSupplier.seek(partition, sequence); } } @@ -1104,7 +1166,7 @@ private void handleParseException(ParseException pe, OrderedPartitionableRecord if (tuningConfig.isLogParseExceptions()) { log.error( pe, - "Encountered parse exception on row from partition[%s] sequence[%s]", + "Encountered parse exception on row from partition[%s] offset[%s]", record.getPartitionId(), record.getSequenceNumber() ); @@ -1366,17 +1428,20 @@ public Response setEndOffsets( Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No Sequences found to set end sequences"); final SequenceMetadata latestSequence = sequences.get(sequences.size() - 1); - // if a partition has not been read yet (contained in initialOffsetsSnapshot), then - // do not mark the starting sequence number as exclusive - Set exclusivePartitions = sequenceNumbers.keySet() - .stream() - .filter(x -> !initialOffsetsSnapshot.contains(x) - || ioConfig.getExclusiveStartSequenceNumberPartitions() - .contains(x)) - .collect(Collectors.toSet()); + final Set exclusiveStartPartitions; + + if (isEndOffsetExclusive()) { + // When end offsets are exclusive, there's no need for marking the next sequence as having any + // exclusive-start partitions. It should always start from the end offsets of the prior sequence. + exclusiveStartPartitions = Collections.emptySet(); + } else { + // When end offsets are inclusive, we must mark all partitions as exclusive-start, to avoid reading + // their final messages (which have already been read). + exclusiveStartPartitions = sequenceNumbers.keySet(); + } if ((latestSequence.getStartOffsets().equals(sequenceNumbers) - && latestSequence.getExclusiveStartPartitions().equals(exclusivePartitions) + && latestSequence.getExclusiveStartPartitions().equals(exclusiveStartPartitions) && !finish) || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) { log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers); @@ -1416,19 +1481,17 @@ public Response setEndOffsets( log.info("Updating endOffsets from [%s] to [%s]", endOffsets, sequenceNumbers); endOffsets.putAll(sequenceNumbers); } else { - exclusiveStartingPartitions.addAll(exclusivePartitions); - // create new sequence + log.info("Creating new sequence with startOffsets [%s] and endOffsets [%s]", sequenceNumbers, endOffsets); final SequenceMetadata newSequence = new SequenceMetadata<>( latestSequence.getSequenceId() + 1, StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1), sequenceNumbers, endOffsets, false, - exclusivePartitions + exclusiveStartPartitions ); sequences.add(newSequence); - initialOffsetsSnapshot.addAll(sequenceNumbers.keySet()); } persistSequences(); } @@ -1582,45 +1645,47 @@ public DateTime getStartTime(@Context final HttpServletRequest req) return startTime; } - private boolean verifyInitialRecordAndSkipExclusivePartition( - final OrderedPartitionableRecord record + /** + * This method does two things: + * + * 1) Verifies that the sequence numbers we read are at least as high as those read previously, and throws an + * exception if not. + * 2) Returns false if we should skip this record because it's either (a) the first record in a partition that we are + * needing to be exclusive on; (b) too late to read, past the endOffsets. + */ + private boolean verifyRecordInRange( + final PartitionIdType partition, + final SequenceOffsetType recordOffset ) { // Check only for the first record among the record batch. - if (initialOffsetsSnapshot.contains(record.getPartitionId())) { - final SequenceOffsetType currOffset = Preconditions.checkNotNull( - currOffsets.get(record.getPartitionId()), - "Current offset is null for sequenceNumber[%s] and partitionId[%s]", - record.getSequenceNumber(), - record.getPartitionId() - ); - final OrderedSequenceNumber recordSequenceNumber = createSequenceNumber( - record.getSequenceNumber() - ); - final OrderedSequenceNumber currentSequenceNumber = createSequenceNumber( - currOffset - ); - if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) { - throw new ISE( - "sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition[%s]", - record.getSequenceNumber(), - currOffset, - record.getPartitionId() - ); - } + final SequenceOffsetType currOffset = Preconditions.checkNotNull( + currOffsets.get(partition), + "Current offset is null for offset[%s] and partition[%s]", + recordOffset, + partition + ); - // Remove the mark to notify that this partition has been read. - initialOffsetsSnapshot.remove(record.getPartitionId()); + final OrderedSequenceNumber recordSequenceNumber = createSequenceNumber(recordOffset); + final OrderedSequenceNumber currentSequenceNumber = createSequenceNumber(currOffset); - // check exclusive starting sequence - if (isStartingSequenceOffsetsExclusive() && exclusiveStartingPartitions.contains(record.getPartitionId())) { - log.info("Skipping starting sequenceNumber for partition[%s] marked exclusive", record.getPartitionId()); + final int comparisonToCurrent = recordSequenceNumber.compareTo(currentSequenceNumber); + if (comparisonToCurrent < 0) { + throw new ISE( + "Record offset[%s] is smaller than current offset[%s] for partition[%s]", + recordOffset, + currOffset, + partition + ); + } - return false; - } + // Check if the record has already been read. + if (isRecordAlreadyRead(partition, recordOffset)) { + return false; } - return true; + // Finally, check if this record comes before the endOffsets for this partition. + return isMoreToReadBeforeReadingRecord(recordSequenceNumber.get(), endOffsets.get(partition)); } /** @@ -1645,16 +1710,14 @@ protected abstract TreeMap> ge ) throws IOException; /** - * Calculates the sequence number used to update `currentOffsets` after finished reading a record. - * In Kafka this returns sequenceNumeber + 1 since that's the next expected offset - * In Kinesis this simply returns sequenceNumber, since the sequence numbers in Kinesis are not - * contiguous and finding the next sequence number requires an expensive API call + * Calculates the sequence number used to update currOffsets after finished reading a record. + * This is what would become the start offsets of the next reader, if we stopped reading now. * * @param sequenceNumber the sequence number that has already been processed * * @return next sequence number to be stored */ - protected abstract SequenceOffsetType getSequenceNumberToStoreAfterRead(SequenceOffsetType sequenceNumber); + protected abstract SequenceOffsetType getNextStartOffset(SequenceOffsetType sequenceNumber); /** * deserializes stored metadata into SeekableStreamPartitions @@ -1726,14 +1789,7 @@ protected abstract void possiblyResetDataSourceMetadata( * In Kafka, the endOffsets are exclusive, so skip it. * In Kinesis the endOffsets are inclusive */ - protected abstract boolean isEndSequenceOffsetsExclusive(); - - /** - * In Kafka, the startingOffsets are inclusive. - * In Kinesis, the startingOffsets are exclusive, except for the first - * partition we read from stream - */ - protected abstract boolean isStartingSequenceOffsetsExclusive(); + protected abstract boolean isEndOffsetExclusive(); protected abstract TypeReference>> getSequenceMetadataTypeReference(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index 7fbc800ac3d1..61bb35a7778a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiFunction; public class SequenceMetadata { @@ -148,17 +149,16 @@ void setEndOffsets(Map newEndOffsets) } void updateAssignments( - SeekableStreamIndexTaskRunner runner, - Map nextPartitionOffset + Map currOffsets, + BiFunction moreToReadFn ) { lock.lock(); try { assignments.clear(); - nextPartitionOffset.forEach((key, value) -> { + currOffsets.forEach((key, value) -> { SequenceOffsetType endOffset = endOffsets.get(key); - if (SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER.equals(endOffset) - || runner.createSequenceNumber(endOffset).compareTo(runner.createSequenceNumber(nextPartitionOffset.get(key))) > 0) { + if (moreToReadFn.apply(value, endOffset)) { assignments.add(key); } }); @@ -188,14 +188,15 @@ boolean canHandle( return false; } boolean ret; - if (runner.isStartingSequenceOffsetsExclusive()) { + if (!runner.isEndOffsetExclusive()) { + // Inclusive endOffsets mean that we must skip the first record of any partition that has been read before. ret = recordOffset.compareTo(partitionStartOffset) >= (getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0); } else { ret = recordOffset.compareTo(partitionStartOffset) >= 0; } - if (runner.isEndSequenceOffsetsExclusive()) { + if (runner.isEndOffsetExclusive()) { ret &= recordOffset.compareTo(partitionEndOffset) < 0; } else { ret &= recordOffset.compareTo(partitionEndOffset) <= 0; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java index f193488240d9..74fd08d445ec 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedSequenceNumber.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.seekablestream.common; +import java.util.Objects; /** * Represents a Kafka/Kinesis stream sequence number. Mainly used to do @@ -51,4 +52,33 @@ public boolean isExclusive() { return isExclusive; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + OrderedSequenceNumber that = (OrderedSequenceNumber) o; + return isExclusive == that.isExclusive && + Objects.equals(sequenceNumber, that.sequenceNumber); + } + + @Override + public int hashCode() + { + return Objects.hash(sequenceNumber, isExclusive); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "sequenceNumber=" + sequenceNumber + + ", isExclusive=" + isExclusive + + '}'; + } } From 972eacc15d20e2b40a6d8bd2423c038ccaaa0d58 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 14 Mar 2019 21:56:46 -0700 Subject: [PATCH 2/2] Changes from code review. --- .../kinesis/KinesisIndexTaskTest.java | 1 - .../SeekableStreamIndexTaskRunner.java | 55 +++++++++++++------ .../SeekableStreamPartitions.java | 6 +- 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index bddd2c5c1014..439584fd76d8 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2399,7 +2399,6 @@ public void testRunWithPauseAndResume() throws Exception final StreamPartition streamPartition = StreamPartition.of(stream, shardId1); recordSupplier.assign(ImmutableSet.of(streamPartition)); expectLastCall(); - expect(recordSupplier.getEarliestSequenceNumber(streamPartition)).andReturn("0"); recordSupplier.seek(streamPartition, "2"); expectLastCall(); expect(recordSupplier.poll(anyLong())).andReturn(records.subList(2, 5)) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 639e36451931..ee9a7e2453e7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -144,7 +144,7 @@ public enum Status private final Map endOffsets; // lastReadOffsets are the last offsets that were read and processed. - private final ConcurrentMap lastReadOffsets = new ConcurrentHashMap<>(); + private final Map lastReadOffsets = new HashMap<>(); // currOffsets are what should become the start offsets of the next reader, if we stopped reading now. They are // initialized to the start offsets when the task begins. @@ -198,7 +198,6 @@ public enum Status private final Set publishingSequences = Sets.newConcurrentHashSet(); private final List> publishWaitList = new ArrayList<>(); private final List> handOffWaitList = new ArrayList<>(); - private final Set exclusiveStartingPartitions = new HashSet<>(); private volatile DateTime startTime; private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread) @@ -277,7 +276,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception Map.Entry> previous = sequenceOffsets.next(); while (sequenceOffsets.hasNext()) { Map.Entry> current = sequenceOffsets.next(); - sequences.add(new SequenceMetadata<>( + addSequence(new SequenceMetadata<>( previous.getKey(), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), previous.getValue(), @@ -288,7 +287,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception previous = current; exclusive = true; } - sequences.add(new SequenceMetadata<>( + addSequence(new SequenceMetadata<>( previous.getKey(), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), previous.getValue(), @@ -297,7 +296,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception exclusive ? previous.getValue().keySet() : null )); } else { - sequences.add(new SequenceMetadata<>( + addSequence(new SequenceMetadata<>( 0, StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), ioConfig.getStartPartitions().getPartitionSequenceNumberMap(), @@ -380,7 +379,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception // Sanity checks. if (!restoredNextPartitions.getStream().equals(ioConfig.getStartPartitions().getStream())) { throw new ISE( - "WTF?! Restored topic[%s] but expected topic[%s]", + "WTF?! Restored stream[%s] but expected stream[%s]", restoredNextPartitions.getStream(), ioConfig.getStartPartitions().getStream() ); @@ -518,7 +517,7 @@ public void run() final boolean shouldProcess = verifyRecordInRange(record.getPartitionId(), record.getSequenceNumber()); log.trace( - "Got topic[%s] partition[%s] offset[%s], shouldProcess[%s].", + "Got stream[%s] partition[%s] sequenceNumber[%s], shouldProcess[%s].", record.getStream(), record.getPartitionId(), record.getSequenceNumber(), @@ -547,7 +546,7 @@ public void run() if (sequenceToUse == null) { throw new ISE( - "WTH?! cannot find any valid sequence for record with partition [%s] and offset [%s]. Current sequences: %s", + "WTH?! cannot find any valid sequence for record with partition [%s] and sequenceNumber [%s]. Current sequences: %s", record.getPartitionId(), record.getSequenceNumber(), sequences @@ -629,7 +628,7 @@ public void onFailure(Throwable t) ); if (!moreToReadAfterThisRecord && assignment.remove(record.getStreamPartition())) { - log.info("Finished reading topic[%s], partition[%s].", record.getStream(), record.getPartitionId()); + log.info("Finished reading stream[%s], partition[%s].", record.getStream(), record.getPartitionId()); recordSupplier.assign(assignment); stillReading = !assignment.isEmpty(); } @@ -925,7 +924,7 @@ public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata) @Override public void onFailure(Throwable t) { - log.error(t, "Error while publishing segments for sequence[%s]", sequenceMetadata); + log.error(t, "Error while publishing segments for sequenceNumber[%s]", sequenceMetadata); handoffFuture.setException(t); } } @@ -1052,6 +1051,30 @@ private Set> assignPartitions( return assignment; } + private void addSequence(final SequenceMetadata sequenceMetadata) + { + // Sanity check that the start of the new sequence matches up with the end of the prior sequence. + for (Map.Entry entry : sequenceMetadata.getStartOffsets().entrySet()) { + final PartitionIdType partition = entry.getKey(); + final SequenceOffsetType startOffset = entry.getValue(); + + if (!sequences.isEmpty()) { + final SequenceOffsetType priorOffset = sequences.get(sequences.size() - 1).endOffsets.get(partition); + + if (!startOffset.equals(priorOffset)) { + throw new ISE( + "New sequence startOffset[%s] does not equal expected prior offset[%s]", + startOffset, + priorOffset + ); + } + } + } + + // Actually do the add. + sequences.add(sequenceMetadata); + } + /** * Returns true if the given record has already been read, based on lastReadOffsets. */ @@ -1107,7 +1130,7 @@ private void seekToStartingSequence( { for (final StreamPartition partition : partitions) { final SequenceOffsetType sequence = currOffsets.get(partition.getPartitionId()); - log.info("Seeking partition[%s] to offset[%s].", partition.getPartitionId(), sequence); + log.info("Seeking partition[%s] to sequenceNumber[%s].", partition.getPartitionId(), sequence); recordSupplier.seek(partition, sequence); } } @@ -1166,7 +1189,7 @@ private void handleParseException(ParseException pe, OrderedPartitionableRecord if (tuningConfig.isLogParseExceptions()) { log.error( pe, - "Encountered parse exception on row from partition[%s] offset[%s]", + "Encountered parse exception on row from partition[%s] sequenceNumber[%s]", record.getPartitionId(), record.getSequenceNumber() ); @@ -1491,7 +1514,7 @@ public Response setEndOffsets( false, exclusiveStartPartitions ); - sequences.add(newSequence); + addSequence(newSequence); } persistSequences(); } @@ -1658,10 +1681,10 @@ private boolean verifyRecordInRange( final SequenceOffsetType recordOffset ) { - // Check only for the first record among the record batch. + // Verify that the record is at least as high as its currOffset. final SequenceOffsetType currOffset = Preconditions.checkNotNull( currOffsets.get(partition), - "Current offset is null for offset[%s] and partition[%s]", + "Current offset is null for sequenceNumber[%s] and partition[%s]", recordOffset, partition ); @@ -1672,7 +1695,7 @@ private boolean verifyRecordInRange( final int comparisonToCurrent = recordSequenceNumber.compareTo(currentSequenceNumber); if (comparisonToCurrent < 0) { throw new ISE( - "Record offset[%s] is smaller than current offset[%s] for partition[%s]", + "Record sequenceNumber[%s] is smaller than current sequenceNumber[%s] for partition[%s]", recordOffset, currOffset, partition diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java index 28f5dde79cb2..dc3ff87aff15 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamPartitions.java @@ -143,9 +143,9 @@ public int hashCode() @Override public String toString() { - return "SeekableStreamPartitions{" + - "stream/topic='" + stream + '\'' + - ", partitionSequenceNumberMap/partitionOffsetMap=" + partitionIdToSequenceNumberMap + + return getClass().getSimpleName() + "{" + + "stream='" + stream + '\'' + + ", partitionSequenceNumberMap=" + partitionIdToSequenceNumberMap + '}'; } }