diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 461ac9e3a896..6668ddd0c5e7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -86,6 +86,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ListenableFutures; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -910,11 +911,11 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception final SeekableStreamPartitions checkpoint1 = new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)); final SeekableStreamPartitions checkpoint2 = - new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 12L)); + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 9L)); final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)); - final KafkaIndexTask task = createTask( + final KafkaIndexTask normalReplica = createTask( null, new KafkaIndexTaskIOConfig( 0, @@ -928,34 +929,68 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception null ) ); - final ListenableFuture future = runTask(task); - while (task.getRunner().getStatus() != Status.PAUSED) { + final KafkaIndexTask staleReplica = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + baseSequenceName, + startPartitions, + endPartitions, + consumerProps, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + + final ListenableFuture normalReplicaFuture = runTask(normalReplica); + // Simulating one replica is slower than the other + final ListenableFuture staleReplicaFuture = ListenableFutures.transformAsync( + taskExec.submit(() -> { + Thread.sleep(1000); + return staleReplica; + }), + this::runTask + ); + + while (normalReplica.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + staleReplica.getRunner().pause(); + while (staleReplica.getRunner().getStatus() != Status.PAUSED) { Thread.sleep(10); } - Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + Map currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets()); Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets); - // Simulating the case when another replica has consumed up to the offset of 8 - task.getRunner().setEndOffsets(ImmutableMap.of(0, 8L), false); + normalReplica.getRunner().setEndOffsets(currentOffsets, false); + staleReplica.getRunner().setEndOffsets(currentOffsets, false); - // The task is supposed to consume remaining rows up to the offset of 13 - while (task.getRunner().getStatus() != Status.PAUSED) { + while (normalReplica.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + while (staleReplica.getRunner().getStatus() != Status.PAUSED) { Thread.sleep(10); } - currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets()); + Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets); + currentOffsets = ImmutableMap.copyOf(staleReplica.getRunner().getCurrentOffsets()); Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets); - task.getRunner().setEndOffsets( - ImmutableMap.of(0, task.getRunner().getCurrentOffsets().get(0) + 1L), - true - ); + normalReplica.getRunner().setEndOffsets(currentOffsets, true); + staleReplica.getRunner().setEndOffsets(currentOffsets, true); - Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + Assert.assertEquals(TaskState.SUCCESS, normalReplicaFuture.get().getStatusCode()); + Assert.assertEquals(TaskState.SUCCESS, staleReplicaFuture.get().getStatusCode()); - // processed count would be 8 if it stopped at it's current offsets - Assert.assertEquals(13, task.getRunner().getRowIngestionMeters().getProcessed()); - Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); - Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(9, normalReplica.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getThrownAway()); + + Assert.assertEquals(9, staleReplica.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, staleReplica.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, staleReplica.getRunner().getRowIngestionMeters().getThrownAway()); } @Test(timeout = 60_000L) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 857b4a76235c..2aded4a410b2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -91,6 +91,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.concurrent.ListenableFutures; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -133,8 +134,6 @@ import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; -import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier; @@ -2557,6 +2556,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception final KinesisIndexTask task = createTask( "task1", + DATA_SCHEMA, new KinesisIndexTaskIOConfig( null, "sequence0", @@ -2616,125 +2616,147 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception final String baseSequenceName = "sequence0"; // as soon as any segment has more than one record, incremental publishing should happen maxRowsPerSegment = 2; - maxRecordsPerPoll = 1; - recordSupplier.assign(anyObject()); + final KinesisRecordSupplier recordSupplier1 = mock(KinesisRecordSupplier.class); + recordSupplier1.assign(anyObject()); expectLastCall().anyTimes(); - - expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); - - recordSupplier.seek(anyObject(), anyString()); + expect(recordSupplier1.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); + recordSupplier1.seek(anyObject(), anyString()); expectLastCall().anyTimes(); - - expect(recordSupplier.poll(anyLong())).andReturn(records.subList(0, 5)) - .once() - .andReturn(records.subList(4, 10)) - .once() - .andReturn(records.subList(9, 15)) - .once(); - - recordSupplier.close(); + expect(recordSupplier1.poll(anyLong())).andReturn(records.subList(0, 5)) + .once() + .andReturn(records.subList(4, 10)) + .once(); + recordSupplier1.close(); + expectLastCall().once(); + final KinesisRecordSupplier recordSupplier2 = mock(KinesisRecordSupplier.class); + recordSupplier2.assign(anyObject()); + expectLastCall().anyTimes(); + expect(recordSupplier2.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes(); + recordSupplier2.seek(anyObject(), anyString()); + expectLastCall().anyTimes(); + expect(recordSupplier2.poll(anyLong())).andReturn(records.subList(0, 5)) + .once() + .andReturn(records.subList(4, 10)) + .once(); + recordSupplier2.close(); expectLastCall().once(); replayAll(); final SeekableStreamPartitions startPartitions = new SeekableStreamPartitions<>( stream, - ImmutableMap.of( - shardId1, - "0" - ) + ImmutableMap.of(shardId1, "0") ); final SeekableStreamPartitions checkpoint1 = new SeekableStreamPartitions<>( stream, - ImmutableMap.of( - shardId1, - "4" - ) + ImmutableMap.of(shardId1, "4") ); final SeekableStreamPartitions checkpoint2 = new SeekableStreamPartitions<>( stream, - ImmutableMap.of( - shardId1, - "9" - ) + ImmutableMap.of(shardId1, "9") ); final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( stream, - ImmutableMap.of( - shardId1, - "14" - ) + ImmutableMap.of(shardId1, "100") // simulating unlimited ); - final KinesisIndexTask task = createTask( + final KinesisIndexTaskIOConfig ioConfig = new KinesisIndexTaskIOConfig( null, - new KinesisIndexTaskIOConfig( - null, - baseSequenceName, - startPartitions, - endPartitions, - true, - null, - null, - "awsEndpoint", - null, - null, - null, - null, - null, - false - ) + baseSequenceName, + startPartitions, + endPartitions, + true, + null, + null, + "awsEndpoint", + null, + null, + null, + null, + null, + false ); - final ListenableFuture future = runTask(task); - while (task.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) { + final KinesisIndexTask normalReplica = createTask( + null, + DATA_SCHEMA, + ioConfig, + null + ); + ((TestableKinesisIndexTask) normalReplica).setLocalSupplier(recordSupplier1); + final KinesisIndexTask staleReplica = createTask( + null, + DATA_SCHEMA, + ioConfig, + null + ); + ((TestableKinesisIndexTask) staleReplica).setLocalSupplier(recordSupplier2); + final ListenableFuture normalReplicaFuture = runTask(normalReplica); + // Simulating one replica is slower than the other + final ListenableFuture staleReplicaFuture = ListenableFutures.transformAsync( + taskExec.submit(() -> { + Thread.sleep(1000); + return staleReplica; + }), + this::runTask + ); + + while (normalReplica.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + staleReplica.getRunner().pause(); + while (staleReplica.getRunner().getStatus() != Status.PAUSED) { Thread.sleep(10); } - Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + Map currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets()); Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets); - task.getRunner().setEndOffsets(currentOffsets, false); - // The task is supposed to consume remaining rows up to the offset of 13 - while (task.getRunner().getStatus() != Status.PAUSED) { + normalReplica.getRunner().setEndOffsets(currentOffsets, false); + staleReplica.getRunner().setEndOffsets(currentOffsets, false); + + while (normalReplica.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + while (staleReplica.getRunner().getStatus() != Status.PAUSED) { Thread.sleep(10); } - currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + currentOffsets = ImmutableMap.copyOf(normalReplica.getRunner().getCurrentOffsets()); + Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets); + currentOffsets = ImmutableMap.copyOf(staleReplica.getRunner().getCurrentOffsets()); Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets); - task.getRunner().setEndOffsets( - ImmutableMap.of(shardId1, String.valueOf(Long.valueOf(task.getRunner().getCurrentOffsets().get(shardId1)) + 1)), - true - ); + normalReplica.getRunner().setEndOffsets(currentOffsets, true); + staleReplica.getRunner().setEndOffsets(currentOffsets, true); - Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + Assert.assertEquals(TaskState.SUCCESS, normalReplicaFuture.get().getStatusCode()); + Assert.assertEquals(TaskState.SUCCESS, staleReplicaFuture.get().getStatusCode()); verifyAll(); Assert.assertEquals(2, checkpointRequestsHash.size()); // Check metrics - Assert.assertEquals(12, task.getRunner().getRowIngestionMeters().getProcessed()); - Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); - Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); + Assert.assertEquals(10, normalReplica.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, normalReplica.getRunner().getRowIngestionMeters().getThrownAway()); // Check published metadata final Set descriptors = new HashSet<>(); - descriptors.add(sd(task, "2008/P1D", 0)); - descriptors.add(sd(task, "2008/P1D", 1)); - descriptors.add(sd(task, "2009/P1D", 0)); - descriptors.add(sd(task, "2010/P1D", 0)); - descriptors.add(sd(task, "2010/P1D", 1)); - descriptors.add(sd(task, "2011/P1D", 0)); - descriptors.add(sd(task, "2011/P1D", 1)); - descriptors.add(sd(task, "2012/P1D", 0)); - descriptors.add(sd(task, "2013/P1D", 0)); + descriptors.add(sd(normalReplica, "2008/P1D", 0)); + descriptors.add(sd(normalReplica, "2009/P1D", 0)); + descriptors.add(sd(normalReplica, "2010/P1D", 0)); + descriptors.add(sd(normalReplica, "2010/P1D", 1)); + descriptors.add(sd(normalReplica, "2011/P1D", 0)); + descriptors.add(sd(normalReplica, "2011/P1D", 1)); + descriptors.add(sd(normalReplica, "2012/P1D", 0)); + descriptors.add(sd(normalReplica, "2013/P1D", 0)); Assert.assertEquals(descriptors, publishedDescriptors()); Assert.assertEquals( new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of( shardId1, - "10" + "9" ))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) ); @@ -2791,26 +2813,27 @@ private KinesisIndexTask createTask( final KinesisIndexTaskIOConfig ioConfig ) { - return createTask(taskId, DATA_SCHEMA, ioConfig); + return createTask(taskId, DATA_SCHEMA, ioConfig, null); } private KinesisIndexTask createTask( final String taskId, - final KinesisIndexTaskIOConfig ioConfig, - final Map context + final DataSchema dataSchema, + final KinesisIndexTaskIOConfig ioConfig ) { - return createTask(taskId, DATA_SCHEMA, ioConfig, context); + return createTask(taskId, dataSchema, ioConfig, null); } private KinesisIndexTask createTask( final String taskId, final DataSchema dataSchema, - final KinesisIndexTaskIOConfig ioConfig + final KinesisIndexTaskIOConfig ioConfig, + @Nullable final Map context ) { final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig( - 1000, + maxRowsInMemory, null, maxRowsPerSegment, maxTotalRows, @@ -2822,11 +2845,11 @@ private KinesisIndexTask createTask( reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - skipAvailabilityCheck, + true, + null, null, null, null, - 5000, null, null, logParseExceptions, @@ -2835,58 +2858,20 @@ private KinesisIndexTask createTask( maxRecordsPerPoll, intermediateHandoffPeriod ); - final Map context = null; - final KinesisIndexTask task = new TestableKinesisIndexTask( - taskId, - null, - cloneDataSchema(dataSchema), - tuningConfig, - ioConfig, - context, - null, - null, - rowIngestionMetersFactory, - null - ); - - return task; + return createTask(taskId, dataSchema, ioConfig, tuningConfig, context); } - private KinesisIndexTask createTask( final String taskId, final DataSchema dataSchema, final KinesisIndexTaskIOConfig ioConfig, - final Map context + final KinesisIndexTaskTuningConfig tuningConfig, + @Nullable final Map context ) { - final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig( - maxRowsInMemory, - null, - maxRowsPerSegment, - maxTotalRows, - new Period("P1Y"), - null, - null, - null, - true, - reportParseExceptions, - handoffConditionTimeout, - resetOffsetAutomatically, - true, - null, - null, - null, - null, - null, - null, - logParseExceptions, - maxParseExceptions, - maxSavedParseExceptions, - maxRecordsPerPoll, - intermediateHandoffPeriod - ); - context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true); + if (context != null) { + context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true); + } final KinesisIndexTask task = new TestableKinesisIndexTask( taskId, @@ -3059,14 +3044,6 @@ public void close() final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig(); dataSegmentPusherConfig.storageDirectory = getSegmentDirectory(); final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig); - SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() - { - @Override - public List getLocations() - { - return new ArrayList<>(); - } - }; toolboxFactory = new TaskToolboxFactory( taskConfig, taskActionClientFactory, @@ -3242,8 +3219,10 @@ private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOExcep @JsonTypeName("index_kinesis") private static class TestableKinesisIndexTask extends KinesisIndexTask { + private KinesisRecordSupplier localSupplier; + @JsonCreator - public TestableKinesisIndexTask( + private TestableKinesisIndexTask( @JsonProperty("id") String id, @JsonProperty("resource") TaskResource taskResource, @JsonProperty("dataSchema") DataSchema dataSchema, @@ -3270,10 +3249,15 @@ public TestableKinesisIndexTask( ); } + private void setLocalSupplier(KinesisRecordSupplier recordSupplier) + { + this.localSupplier = recordSupplier; + } + @Override protected KinesisRecordSupplier newTaskRecordSupplier() { - return recordSupplier; + return localSupplier == null ? recordSupplier : localSupplier; } }