From ed46ed37a71e566be7212f9841a225745a271b40 Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Wed, 13 Mar 2019 23:36:14 -0700 Subject: [PATCH] Fix KafkaRecordSupplier assign (#7260) * Fix KafkaRecordSupplier assign * TeamCity fix --- .../indexing/kafka/KafkaRecordSupplier.java | 1 - .../indexing/kafka/KafkaIndexTaskTest.java | 59 +++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 935404cbc7c2..60aea3c292aa 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -68,7 +68,6 @@ public void assign(Set> streamPartitions) .stream() .map(x -> new TopicPartition(x.getStream(), x.getPartitionId())) .collect(Collectors.toSet())); - seekToEarliest(streamPartitions); } @Override diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index a2224e5fde78..d2f737359e3a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2295,6 +2295,65 @@ public void testRunWithDuplicateRequest() throws Exception Assert.assertEquals(Status.READING, task.getRunner().getStatus()); } + @Test(timeout = 60_000L) + public void testCanStartFromLaterThanEarliestOffset() throws Exception + { + if (!isIncrementalHandoffSupported) { + return; + } + final String baseSequenceName = "sequence0"; + maxRowsPerSegment = Integer.MAX_VALUE; + maxTotalRows = null; + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); + } + } + + Map consumerProps = kafkaServer.consumerProperties(); + consumerProps.put("max.poll.records", "1"); + + final SeekableStreamPartitions startPartitions = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of( + 0, + 0L, + 1, + 1L + ) + ); + + final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of( + 0, + 10L, + 1, + 2L + ) + ); + + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + baseSequenceName, + startPartitions, + endPartitions, + consumerProps, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + false + ) + ); + final ListenableFuture future = runTask(task); + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + } + private ListenableFuture runTask(final Task task) { try {