diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index c82700d96260..c822088ebc88 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -959,7 +959,7 @@ public Map getCurrentOffsets(@Context final HttpServletRequest re } @Override - public Map getCurrentOffsets() + public ConcurrentMap getCurrentOffsets() { return nextOffsets; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index b19bf8ad3716..ef771857d27d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -226,8 +226,6 @@ public class KafkaIndexTaskTest private File reportsFile; private RowIngestionMetersFactory rowIngestionMetersFactory; - private int handoffCount = 0; - // This should be removed in versions greater that 0.12.x // isIncrementalHandoffSupported should always be set to true in those later versions @Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}") @@ -877,7 +875,14 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")), new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")), new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")) + new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0")) ); final String baseSequenceName = "sequence0"; @@ -904,9 +909,13 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception topic, ImmutableMap.of(0, 5L) ); + final SeekableStreamPartitions checkpoint2 = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of(0, 12L) + ); final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( topic, - ImmutableMap.of(0, 7L) + ImmutableMap.of(0, Long.MAX_VALUE) ); final KafkaIndexTask task = createTask( @@ -927,17 +936,28 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception while (task.getRunner().getStatus() != Status.PAUSED) { Thread.sleep(10); } - final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); - Assert.assertTrue(checkpoint1.getPartitionSequenceNumberMap().equals(currentOffsets)); + Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets); + + // Simulating the case when another replica has consumed up to the offset of 8 + task.getRunner().setEndOffsets(ImmutableMap.of(0, 8L), false); - // actual checkpoint offset is 5, but simulating behavior of publishing set end offset call, to ensure this task - // will continue reading through the end offset of the checkpointed sequence - task.getRunner().setEndOffsets(ImmutableMap.of(0, 6L), true); + // The task is supposed to consume remaining rows up to the offset of 13 + while (task.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets); + + task.getRunner().setEndOffsets( + ImmutableMap.of(0, task.getRunner().getCurrentOffsets().get(0) + 1L), + true + ); Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); - // processed count would be 5 if it stopped at it's current offsets - Assert.assertEquals(6, task.getRunner().getRowIngestionMeters().getProcessed()); + // processed count would be 8 if it stopped at it's current offsets + Assert.assertEquals(13, task.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 78ce481980bc..98a3bee494e7 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -176,6 +176,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -639,7 +640,6 @@ public void testIncrementalHandOff() throws Exception Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7)); } - @Test(timeout = 120_000L) public void testIncrementalHandOffMaxTotalRows() throws Exception { @@ -2277,7 +2277,7 @@ public void testRunWithPauseAndResume() throws Exception verifyAll(); - Map currentOffsets = task.getRunner().getCurrentOffsets(); + ConcurrentMap currentOffsets = task.getRunner().getCurrentOffsets(); try { future.get(10, TimeUnit.SECONDS); @@ -2423,6 +2423,154 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } + @Test(timeout = 5000L) + public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception + { + final List> records = ImmutableList.of( + new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "14", jb("2013", "e", "y", "10", "20.0", "1.0")) + ); + + final String baseSequenceName = "sequence0"; + // as soon as any segment has more than one record, incremental publishing should happen + maxRowsPerSegment = 2; + maxRecordsPerPoll = 1; + + recordSupplier.assign(anyObject()); + expectLastCall().anyTimes(); + + expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); + + recordSupplier.seek(anyObject(), anyString()); + expectLastCall().anyTimes(); + + expect(recordSupplier.poll(anyLong())).andReturn(records.subList(0, 5)) + .once() + .andReturn(records.subList(4, 10)) + .once() + .andReturn(records.subList(9, 15)) + .once(); + + recordSupplier.close(); + expectLastCall().once(); + + replayAll(); + + final SeekableStreamPartitions startPartitions = new SeekableStreamPartitions<>( + stream, + ImmutableMap.of( + shardId1, + "0" + ) + ); + + final SeekableStreamPartitions checkpoint1 = new SeekableStreamPartitions<>( + stream, + ImmutableMap.of( + shardId1, + "4" + ) + ); + + final SeekableStreamPartitions checkpoint2 = new SeekableStreamPartitions<>( + stream, + ImmutableMap.of( + shardId1, + "9" + ) + ); + + final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( + stream, + ImmutableMap.of( + shardId1, + "14" + ) + ); + final KinesisIndexTask task = createTask( + null, + new KinesisIndexTaskIOConfig( + null, + baseSequenceName, + startPartitions, + endPartitions, + true, + null, + null, + "awsEndpoint", + null, + null, + null, + null, + null, + false + ) + ); + final ListenableFuture future = runTask(task); + while (task.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) { + Thread.sleep(10); + } + Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets); + task.getRunner().setEndOffsets(currentOffsets, false); + + // The task is supposed to consume remaining rows up to the offset of 13 + while (task.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets); + + task.getRunner().setEndOffsets( + ImmutableMap.of(shardId1, String.valueOf(Long.valueOf(task.getRunner().getCurrentOffsets().get(shardId1)) + 1)), + true + ); + + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + + verifyAll(); + + Assert.assertEquals(2, checkpointRequestsHash.size()); + + // Check metrics + Assert.assertEquals(12, task.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); + + // Check published metadata + final Set descriptors = new HashSet<>(); + descriptors.add(sd(task, "2008/P1D", 0)); + descriptors.add(sd(task, "2008/P1D", 1)); + descriptors.add(sd(task, "2009/P1D", 0)); + descriptors.add(sd(task, "2010/P1D", 0)); + descriptors.add(sd(task, "2010/P1D", 1)); + descriptors.add(sd(task, "2011/P1D", 0)); + descriptors.add(sd(task, "2011/P1D", 1)); + descriptors.add(sd(task, "2012/P1D", 0)); + descriptors.add(sd(task, "2013/P1D", 0)); + Assert.assertEquals(descriptors, publishedDescriptors()); + Assert.assertEquals( + new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of( + shardId1, + "10" + ))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + } + private ListenableFuture runTask(final Task task) { try { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index d4659bf57b8f..833cc172787a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -61,7 +61,7 @@ import java.util.Map; -public abstract class SeekableStreamIndexTask +public abstract class SeekableStreamIndexTask extends AbstractTask implements ChatHandler { public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index b2b906ee57ce..7eee9dcf259e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -130,7 +130,7 @@ * @param Partition Number Type * @param Sequence Number Type */ -public abstract class SeekableStreamIndexTaskRunner implements ChatHandler +public abstract class SeekableStreamIndexTaskRunner implements ChatHandler { public enum Status { @@ -196,7 +196,7 @@ public enum Status private final Set publishingSequences = Sets.newConcurrentHashSet(); private final List> publishWaitList = new ArrayList<>(); private final List> handOffWaitList = new ArrayList<>(); - private final Map initialOffsetsSnapshot = new HashMap<>(); + private final Set initialOffsetsSnapshot = new HashSet<>(); private final Set exclusiveStartingPartitions = new HashSet<>(); private volatile DateTime startTime; @@ -454,7 +454,7 @@ public void run() status = Status.READING; Throwable caughtExceptionInner = null; - initialOffsetsSnapshot.putAll(currOffsets); + initialOffsetsSnapshot.addAll(currOffsets.keySet()); exclusiveStartingPartitions.addAll(ioConfig.getExclusiveStartSequenceNumberPartitions()); try { @@ -490,7 +490,6 @@ public void run() maybePersistAndPublishSequences(committerSupplier); - // calling getRecord() ensures that exceptions specific to kafka/kinesis like OffsetOutOfRangeException // are handled in the subclasses. List> records = getRecords( @@ -512,9 +511,9 @@ public void run() continue; } - // for the first message we receive, check that we were given a message with a sequenceNumber that matches our - // expected starting sequenceNumber - if (!verifyInitialRecordAndSkipExclusivePartition(record, initialOffsetsSnapshot)) { + // for the first message we receive, check that we were given a message with a sequenceNumber that matches + // our expected starting sequenceNumber + if (!verifyInitialRecordAndSkipExclusivePartition(record)) { continue; } @@ -1281,7 +1280,7 @@ public Map getCurrentOffsets(@Context final return getCurrentOffsets(); } - public Map getCurrentOffsets() + public ConcurrentMap getCurrentOffsets() { return currOffsets; } @@ -1384,14 +1383,15 @@ public Response setEndOffsets( // do not mark the starting sequence number as exclusive Set exclusivePartitions = sequenceNumbers.keySet() .stream() - .filter(x -> !initialOffsetsSnapshot.containsKey(x) + .filter(x -> !initialOffsetsSnapshot.contains(x) || ioConfig.getExclusiveStartSequenceNumberPartitions() .contains(x)) .collect(Collectors.toSet()); - if ((latestSequence.getStartOffsets().equals(sequenceNumbers) && latestSequence.exclusiveStartPartitions.equals( - exclusivePartitions) && !finish) || - (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) { + if ((latestSequence.getStartOffsets().equals(sequenceNumbers) + && latestSequence.exclusiveStartPartitions.equals(exclusivePartitions) + && !finish) + || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) { log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers); resume(); return Response.ok(sequenceNumbers).build(); @@ -1442,7 +1442,7 @@ public Response setEndOffsets( exclusivePartitions ); sequences.add(newSequence); - initialOffsetsSnapshot.putAll(sequenceNumbers); + initialOffsetsSnapshot.addAll(sequenceNumbers.keySet()); } persistSequences(); } @@ -1882,33 +1882,35 @@ TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTr } private boolean verifyInitialRecordAndSkipExclusivePartition( - final OrderedPartitionableRecord record, - final Map intialSequenceSnapshot + final OrderedPartitionableRecord record ) { - if (intialSequenceSnapshot.containsKey(record.getPartitionId())) { - if (record.getSequenceNumber().compareTo(intialSequenceSnapshot.get(record.getPartitionId())) < 0) { - throw new ISE( - "Starting sequenceNumber [%s] does not match expected [%s] for partition [%s]", - record.getSequenceNumber(), - intialSequenceSnapshot.get(record.getPartitionId()), - record.getPartitionId() + // Check only for the first record among the record batch. + if (initialOffsetsSnapshot.contains(record.getPartitionId())) { + final SequenceOffsetType currOffset = currOffsets.get(record.getPartitionId()); + if (currOffset != null) { + final OrderedSequenceNumber recordSequenceNumber = createSequenceNumber( + record.getSequenceNumber() + ); + final OrderedSequenceNumber currentSequenceNumber = createSequenceNumber( + currOffset ); + if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) { + throw new ISE( + "sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition [%s]", + record.getSequenceNumber(), + currOffset, + record.getPartitionId() + ); + } } - log.info( - "Verified starting sequenceNumber [%s] for partition [%s]", - record.getSequenceNumber(), record.getPartitionId() - ); - - intialSequenceSnapshot.remove(record.getPartitionId()); - if (intialSequenceSnapshot.isEmpty()) { - log.info("Verified starting sequences for all partitions"); - } + // Remove the mark to notify that this partition has been read. + initialOffsetsSnapshot.remove(record.getPartitionId()); // check exclusive starting sequence if (isStartingSequenceOffsetsExclusive() && exclusiveStartingPartitions.contains(record.getPartitionId())) { - log.info("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId()); + log.warn("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId()); return false; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java index a122d9e15213..4dd653e760ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java @@ -35,7 +35,7 @@ * @param partition id * @param sequence number */ -public class OrderedPartitionableRecord +public class OrderedPartitionableRecord { private final String stream; private final PartitionIdType partitionId; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java index 3a6e87ed3b24..d9e599da0c80 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java @@ -36,7 +36,7 @@ * @param Sequence Number Type */ @Beta -public interface RecordSupplier extends Closeable +public interface RecordSupplier extends Closeable { /** * assigns the given partitions to this RecordSupplier diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 4c6509d31f82..25250ac0487c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -119,7 +119,7 @@ * @param the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type * @param the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers */ -public abstract class SeekableStreamSupervisor +public abstract class SeekableStreamSupervisor implements Supervisor { public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";