From 75d46d5a9b2831288ec692fea51dce54c3dfc653 Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Sat, 2 Feb 2019 17:16:20 +0800 Subject: [PATCH 1/2] fix kafka index task doesn't resume when recieve duplicate request --- .../indexing/seekablestream/SeekableStreamIndexTaskRunner.java | 1 + 1 file changed, 1 insertion(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 8e502f0387eb..f253734cee01 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1402,6 +1402,7 @@ public Response setEndOffsets( exclusivePartitions) && !finish) || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) { log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers); + resume(); return Response.ok(sequenceNumbers).build(); } else if (latestSequence.isCheckpointed()) { return Response.status(Response.Status.BAD_REQUEST) From ebf518e65b375268a92bb75b95a2afd00e851370 Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Tue, 12 Feb 2019 18:24:14 +0800 Subject: [PATCH 2/2] add unit test --- .../indexing/kafka/KafkaIndexTaskTest.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 7bf83ddd3a03..2130dffc4545 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2128,6 +2128,49 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } + @Test(timeout = 60_000L) + public void testRunWithDuplicateRequest() throws Exception + { + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); + } + } + + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + false + ) + ); + + runTask(task); + + while (!task.getRunner().getStatus().equals(Status.READING)) { + Thread.sleep(20); + } + + // first setEndOffsets request + task.getRunner().pause(); + task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true); + Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + + // duplicate setEndOffsets request + task.getRunner().pause(); + task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true); + Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + } + private ListenableFuture runTask(final Task task) { try {