From 19e3fbf24939672fa9d77f912e1b95606e2f80bf Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 13 Mar 2019 18:32:30 -0700 Subject: [PATCH 1/2] Fix KafkaRecordSupplier assign --- .../indexing/kafka/KafkaRecordSupplier.java | 1 - .../indexing/kafka/KafkaIndexTaskTest.java | 63 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 6c3d053a3bb5..be25c49eba64 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -69,7 +69,6 @@ public void assign(Set> streamPartitions) .stream() .map(x -> new TopicPartition(x.getStream(), x.getPartitionId())) .collect(Collectors.toSet())); - seekToEarliest(streamPartitions); } @Override diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index a0caebc7dee6..f9c00fbfb42c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2221,6 +2221,69 @@ public void testRunTransactionModeRollback() throws Exception Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc4)); } + @Test(timeout = 60_000L) + public void testCanStartFromLaterThanEarliestOffset() throws Exception + { + if (!isIncrementalHandoffSupported) { + return; + } + final String baseSequenceName = "sequence0"; + maxRowsPerSegment = Integer.MAX_VALUE; + maxTotalRows = null; + + // Insert data + int numToAdd = records.size(); + + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (int i = 0; i < numToAdd; i++) { + kafkaProducer.send(records.get(i)).get(); + } + kafkaProducer.commitTransaction(); + } + + Map consumerProps = kafkaServer.consumerProperties(); + consumerProps.put("max.poll.records", "1"); + + final SeekableStreamPartitions startPartitions = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of( + 0, + 0L, + 1, + 1L + ) + ); + + final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of( + 0, + 10L, + 1, + 2L + ) + ); + + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + baseSequenceName, + startPartitions, + endPartitions, + consumerProps, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + final ListenableFuture future = runTask(task); + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + } + private List scanData(final Task task, QuerySegmentSpec spec) { ScanQuery query = new Druids.ScanQueryBuilder().dataSource( From 82b7b85b8459e83b9785b558dcbe24e4f3efa3b0 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 13 Mar 2019 21:45:32 -0700 Subject: [PATCH 2/2] TeamCity fix --- .../org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index f9c00fbfb42c..c4c446dac096 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2237,8 +2237,8 @@ public void testCanStartFromLaterThanEarliestOffset() throws Exception try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { kafkaProducer.initTransactions(); kafkaProducer.beginTransaction(); - for (int i = 0; i < numToAdd; i++) { - kafkaProducer.send(records.get(i)).get(); + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); } kafkaProducer.commitTransaction(); }