diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 04d3802a66cb..b7b389668a46 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2127,6 +2127,49 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } + @Test(timeout = 60_000L) + public void testRunWithDuplicateRequest() throws Exception + { + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); + } + } + + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + false + ) + ); + + runTask(task); + + while (!task.getRunner().getStatus().equals(Status.READING)) { + Thread.sleep(20); + } + + // first setEndOffsets request + task.getRunner().pause(); + task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true); + Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + + // duplicate setEndOffsets request + task.getRunner().pause(); + task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true); + Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + } + private ListenableFuture runTask(final Task task) { try { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 857264965f50..c86a2b509c0c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1403,6 +1403,7 @@ public Response setEndOffsets( exclusivePartitions) && !finish) || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) { log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers); + resume(); return Response.ok(sequenceNumbers).build(); } else if (latestSequence.isCheckpointed()) { return Response.status(Response.Status.BAD_REQUEST)