diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 6c3d053a3bb5..be25c49eba64 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -69,7 +69,6 @@ public void assign(Set> streamPartitions) .stream() .map(x -> new TopicPartition(x.getStream(), x.getPartitionId())) .collect(Collectors.toSet())); - seekToEarliest(streamPartitions); } @Override diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index a0caebc7dee6..c4c446dac096 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2221,6 +2221,69 @@ public void testRunTransactionModeRollback() throws Exception Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc4)); } + @Test(timeout = 60_000L) + public void testCanStartFromLaterThanEarliestOffset() throws Exception + { + if (!isIncrementalHandoffSupported) { + return; + } + final String baseSequenceName = "sequence0"; + maxRowsPerSegment = Integer.MAX_VALUE; + maxTotalRows = null; + + // Insert data + int numToAdd = records.size(); + + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + + Map consumerProps = kafkaServer.consumerProperties(); + consumerProps.put("max.poll.records", "1"); + + final SeekableStreamPartitions startPartitions = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of( + 0, + 0L, + 1, + 1L + ) + ); + + final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of( + 0, + 10L, + 1, + 2L + ) + ); + + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + baseSequenceName, + startPartitions, + endPartitions, + consumerProps, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + final ListenableFuture future = runTask(task); + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + } + private List scanData(final Task task, QuerySegmentSpec spec) { ScanQuery query = new Druids.ScanQueryBuilder().dataSource(