From 08238bc59c3e13daea66526d2a5434b1b4742b57 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 19 Oct 2018 16:16:49 -0700 Subject: [PATCH 01/16] Support kafka transactional topics * update kafka to version 2.0.0 * Remove the skipOffsetGaps option since it's not used anymore * Adjust kafka consumer to use transactional semantics * Update tests --- .../extensions-core/kafka-ingestion.md | 1 - .../kafka-indexing-service/pom.xml | 2 +- ...ementalPublishingKafkaIndexTaskRunner.java | 39 +- .../druid/indexing/kafka/KafkaIOConfig.java | 12 +- .../druid/indexing/kafka/KafkaIndexTask.java | 1 + .../kafka/LegacyKafkaIndexTaskRunner.java | 19 +- .../kafka/supervisor/KafkaSupervisor.java | 3 +- .../supervisor/KafkaSupervisorIOConfig.java | 12 +- .../indexing/kafka/KafkaIOConfigTest.java | 6 +- .../indexing/kafka/KafkaIndexTaskTest.java | 595 ++++++++++-------- .../KafkaSupervisorIOConfigTest.java | 5 +- .../kafka/supervisor/KafkaSupervisorTest.java | 103 ++- .../druid/indexing/kafka/test/TestBroker.java | 11 +- pom.xml | 2 +- 14 files changed, 411 insertions(+), 400 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 12bd5f637de4..bb4e3fcf3ad9 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -176,7 +176,6 @@ For Roaring bitmaps: |`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)| |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)| |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped.|no (default == none)| -|`skipOffsetGaps`|Boolean|Whether or not to allow gaps of missing offsets in the Kafka stream. This is required for compatibility with implementations such as MapR Streams which does not guarantee consecutive offsets. If this is false, an exception will be thrown if offsets are not consecutive.|no (default == false)| ## Supervisor API diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml index ff22c8d737b8..100609230b71 100644 --- a/extensions-core/kafka-indexing-service/pom.xml +++ b/extensions-core/kafka-indexing-service/pom.xml @@ -34,7 +34,7 @@ - 0.10.2.0 + 2.0.0 diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 34eca06c626e..87b5baa9a6ec 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -99,6 +99,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -429,7 +430,7 @@ public void run() } // if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true - if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) { + if (stopRequested.get() || sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) { status = Status.PUBLISHING; break; } @@ -447,42 +448,26 @@ public void run() // that has not been written yet (which is totally legitimate). So let's wait for it to show up. ConsumerRecords records = ConsumerRecords.empty(); try { - records = consumer.poll(KafkaIndexTask.POLL_TIMEOUT_MILLIS); + records = consumer.poll(Duration.ofMillis(KafkaIndexTask.POLL_TIMEOUT_MILLIS)); } catch (OffsetOutOfRangeException e) { log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox); stillReading = !assignment.isEmpty(); } - SequenceMetadata sequenceToCheckpoint = null; + int currentPartition = 0; + long currentOffset = 0; for (ConsumerRecord record : records) { + currentPartition = record.partition(); + currentOffset = record.offset(); log.trace( "Got topic[%s] partition[%d] offset[%,d].", record.topic(), record.partition(), record.offset() ); - if (record.offset() < endOffsets.get(record.partition())) { - if (record.offset() != nextOffsets.get(record.partition())) { - if (ioConfig.isSkipOffsetGaps()) { - log.warn( - "Skipped to offset[%,d] after offset[%,d] in partition[%d].", - record.offset(), - nextOffsets.get(record.partition()), - record.partition() - ); - } else { - throw new ISE( - "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", - record.offset(), - nextOffsets.get(record.partition()), - record.partition() - ); - } - } - try { final byte[] valueBytes = record.value(); final List rows = valueBytes == null @@ -574,15 +559,19 @@ public void onFailure(Throwable t) nextOffsets.put(record.partition(), record.offset() + 1); } - - if (nextOffsets.get(record.partition()).equals(endOffsets.get(record.partition())) + if (nextOffsets.get(record.partition()) >= endOffsets.get(record.partition()) && assignment.remove(record.partition())) { log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); KafkaIndexTask.assignPartitions(consumer, topic, assignment); stillReading = !assignment.isEmpty(); } } - + if (nextOffsets.get(currentPartition) != null + && endOffsets.get(currentPartition) != null + && nextOffsets.get(currentPartition) < endOffsets.get(currentPartition) + && currentOffset > endOffsets.get(currentPartition)) { + nextOffsets.put(currentPartition, endOffsets.get(currentPartition)); + } if (System.currentTimeMillis() > nextCheckpointTime) { sequenceToCheckpoint = sequences.get(sequences.size() - 1); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java index 6a9af7fcea94..322cde1870aa 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIOConfig.java @@ -43,7 +43,6 @@ public class KafkaIOConfig implements IOConfig private final boolean useTransaction; private final Optional minimumMessageTime; private final Optional maximumMessageTime; - private final boolean skipOffsetGaps; @JsonCreator public KafkaIOConfig( @@ -54,8 +53,7 @@ public KafkaIOConfig( @JsonProperty("consumerProperties") Map consumerProperties, @JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, - @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, - @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps + @JsonProperty("maximumMessageTime") DateTime maximumMessageTime ) { this.taskGroupId = taskGroupId; @@ -66,7 +64,6 @@ public KafkaIOConfig( this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); - this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS; Preconditions.checkArgument( startPartitions.getTopic().equals(endPartitions.getTopic()), @@ -137,12 +134,6 @@ public Optional getMinimumMessageTime() return minimumMessageTime; } - @JsonProperty - public boolean isSkipOffsetGaps() - { - return skipOffsetGaps; - } - @Override public String toString() { @@ -155,7 +146,6 @@ public String toString() ", useTransaction=" + useTransaction + ", minimumMessageTime=" + minimumMessageTime + ", maximumMessageTime=" + maximumMessageTime + - ", skipOffsetGaps=" + skipOffsetGaps + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index bb73651e6e87..50b44ab6a80c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -296,6 +296,7 @@ KafkaConsumer newConsumer() props.setProperty("auto.offset.reset", "none"); props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName()); props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName()); + props.setProperty("isolation.level", "read_committed"); return new KafkaConsumer<>(props); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index 88dfe70ef7b5..6db22b044dbd 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -390,23 +390,6 @@ public void run() } if (record.offset() < endOffsets.get(record.partition())) { - if (record.offset() != nextOffsets.get(record.partition())) { - if (ioConfig.isSkipOffsetGaps()) { - log.warn( - "Skipped to offset[%,d] after offset[%,d] in partition[%d].", - record.offset(), - nextOffsets.get(record.partition()), - record.partition() - ); - } else { - throw new ISE( - "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", - record.offset(), - nextOffsets.get(record.partition()), - record.partition() - ); - } - } try { final byte[] valueBytes = record.value(); @@ -467,7 +450,7 @@ public void run() nextOffsets.put(record.partition(), record.offset() + 1); } - if (nextOffsets.get(record.partition()).equals(endOffsets.get(record.partition())) + if (nextOffsets.get(record.partition()) >= (endOffsets.get(record.partition())) && assignment.remove(record.partition())) { log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); KafkaIndexTask.assignPartitions(consumer, topic, assignment); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index b7845cae2206..38312ccc627a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1930,8 +1930,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc consumerProperties, true, minimumMessageTime, - maximumMessageTime, - ioConfig.isSkipOffsetGaps() + maximumMessageTime ); final String checkpoints = sortingMapper.writerWithType(new TypeReference>>() diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index 44c2bb2d6f73..12975842ab37 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -47,7 +47,6 @@ public class KafkaSupervisorIOConfig private final Duration completionTimeout; private final Optional lateMessageRejectionPeriod; private final Optional earlyMessageRejectionPeriod; - private final boolean skipOffsetGaps; @JsonCreator public KafkaSupervisorIOConfig( @@ -61,8 +60,7 @@ public KafkaSupervisorIOConfig( @JsonProperty("useEarliestOffset") Boolean useEarliestOffset, @JsonProperty("completionTimeout") Period completionTimeout, @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod, - @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, - @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps + @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod ) { this.topic = Preconditions.checkNotNull(topic, "topic"); @@ -85,7 +83,6 @@ public KafkaSupervisorIOConfig( this.earlyMessageRejectionPeriod = earlyMessageRejectionPeriod == null ? Optional.absent() : Optional.of(earlyMessageRejectionPeriod.toStandardDuration()); - this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false; } @JsonProperty @@ -154,12 +151,6 @@ public Optional getLateMessageRejectionPeriod() return lateMessageRejectionPeriod; } - @JsonProperty - public boolean isSkipOffsetGaps() - { - return skipOffsetGaps; - } - @Override public String toString() { @@ -174,7 +165,6 @@ public String toString() ", useEarliestOffset=" + useEarliestOffset + ", completionTimeout=" + completionTimeout + ", lateMessageRejectionPeriod=" + lateMessageRejectionPeriod + - ", skipOffsetGaps=" + skipOffsetGaps + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java index 2321974ff383..9fc0fe86b7fa 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java @@ -74,8 +74,6 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); Assert.assertTrue(config.isUseTransaction()); Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent()); - Assert.assertFalse("maximumMessageTime", config.getMaximumMessageTime().isPresent()); - Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps()); } @Test @@ -90,8 +88,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" - + " \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n" - + " \"skipOffsetGaps\": true\n" + + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; KafkaIOConfig config = (KafkaIOConfig) mapper.readValue( @@ -112,7 +109,6 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertFalse(config.isUseTransaction()); Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getMinimumMessageTime().get()); Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get()); - Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 6dd210ae4c59..7901bcebc756 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -99,13 +99,16 @@ import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.Druids; import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.Result; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -113,6 +116,13 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanQueryConfig; +import org.apache.druid.query.scan.ScanQueryEngine; +import org.apache.druid.query.scan.ScanQueryQueryToolChest; +import org.apache.druid.query.scan.ScanQueryRunnerFactory; +import org.apache.druid.query.scan.ScanResultValue; +import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; @@ -169,10 +179,12 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.druid.query.QueryPlus.wrap; @@ -371,11 +383,7 @@ public static void tearDownClass() throws Exception public void testRunAfterDataInserted() throws Exception { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -387,8 +395,7 @@ public void testRunAfterDataInserted() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -429,8 +436,7 @@ public void testRunBeforeDataInserted() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -442,11 +448,7 @@ public void testRunBeforeDataInserted() throws Exception } // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -481,11 +483,7 @@ public void testIncrementalHandOff() throws Exception maxRowsPerSegment = 2; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); Map consumerProps = kafkaServer.consumerProperties(); consumerProps.put("max.poll.records", "1"); @@ -505,8 +503,7 @@ public void testIncrementalHandOff() throws Exception consumerProps, true, null, - null, - false + null ) ); final ListenableFuture future = runTask(task); @@ -577,108 +574,115 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception int numToAdd = records.size() - 2; try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (int i = 0; i < numToAdd; i++) { kafkaProducer.send(records.get(i)).get(); } + kafkaProducer.commitTransaction(); + } - Map consumerProps = kafkaServer.consumerProperties(); - consumerProps.put("max.poll.records", "1"); - - final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L)); - final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 3L, 1, 0L)); - final KafkaPartitions checkpoint2 = new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 0L)); - - final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 2L)); - final KafkaIndexTask task = createTask( - null, - new KafkaIOConfig( - 0, - baseSequenceName, - startPartitions, - endPartitions, - consumerProps, - true, - null, - null, - false - ) - ); - final ListenableFuture future = runTask(task); - while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { - Thread.sleep(10); - } - final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + Map consumerProps = kafkaServer.consumerProperties(); + consumerProps.put("max.poll.records", "1"); - Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets)); - task.getRunner().setEndOffsets(currentOffsets, false); + final KafkaPartitions startPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L)); + final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 3L, 1, 0L)); + final KafkaPartitions checkpoint2 = new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 0L)); - while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { - Thread.sleep(10); - } + final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 2L)); + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + 0, + baseSequenceName, + startPartitions, + endPartitions, + consumerProps, + true, + null, + null + ) + ); + final ListenableFuture future = runTask(task); + while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { + Thread.sleep(10); + } + final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + + Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets)); + task.getRunner().setEndOffsets(currentOffsets, false); + + while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { + Thread.sleep(10); + } - // add remaining records + // add remaining records + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (int i = numToAdd; i < records.size(); i++) { kafkaProducer.send(records.get(i)).get(); } - final Map nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); - - Assert.assertTrue(checkpoint2.getPartitionOffsetMap().equals(nextOffsets)); - task.getRunner().setEndOffsets(nextOffsets, false); - - Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); - - Assert.assertEquals(2, checkpointRequestsHash.size()); - Assert.assertTrue( - checkpointRequestsHash.contains( - Objects.hash( - DATA_SCHEMA.getDataSource(), - 0, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)) - ) - ) - ); - Assert.assertTrue( - checkpointRequestsHash.contains( - Objects.hash( - DATA_SCHEMA.getDataSource(), - 0, - new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)), - new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) - ) - ) - ); + kafkaProducer.commitTransaction(); + } + final Map nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); - // Check metrics - Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed()); - Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable()); - Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway()); - - // Check published metadata - SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); - SegmentDescriptor desc2 = SD(task, "2009/P1D", 0); - SegmentDescriptor desc3 = SD(task, "2010/P1D", 0); - SegmentDescriptor desc4 = SD(task, "2011/P1D", 0); - SegmentDescriptor desc5 = SD(task, "2011/P1D", 1); - SegmentDescriptor desc6 = SD(task, "2012/P1D", 0); - SegmentDescriptor desc7 = SD(task, "2013/P1D", 0); - Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); - Assert.assertEquals( - new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 2L))), - metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) - ); + Assert.assertTrue(checkpoint2.getPartitionOffsetMap().equals(nextOffsets)); + task.getRunner().setEndOffsets(nextOffsets, false); - // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1)); - Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2)); - Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc3)); - Assert.assertTrue((ImmutableList.of("d", "e").equals(readSegmentColumn("dim1", desc4)) - && ImmutableList.of("h").equals(readSegmentColumn("dim1", desc5))) || - (ImmutableList.of("d", "h").equals(readSegmentColumn("dim1", desc4)) - && ImmutableList.of("e").equals(readSegmentColumn("dim1", desc5)))); - Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc6)); - Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7)); - } + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + + Assert.assertEquals(2, checkpointRequestsHash.size()); + Assert.assertTrue( + checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + 0, + new KafkaDataSourceMetadata(startPartitions), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)) + ) + ) + ); + Assert.assertTrue( + checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + 0, + new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) + ) + ) + ); + + // Check metrics + Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2009/P1D", 0); + SegmentDescriptor desc3 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc4 = SD(task, "2011/P1D", 0); + SegmentDescriptor desc5 = SD(task, "2011/P1D", 1); + SegmentDescriptor desc6 = SD(task, "2012/P1D", 0); + SegmentDescriptor desc7 = SD(task, "2013/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 2L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc3)); + Assert.assertTrue((ImmutableList.of("d", "e").equals(readSegmentColumn("dim1", desc4)) + && ImmutableList.of("h").equals(readSegmentColumn("dim1", desc5))) || + (ImmutableList.of("d", "h").equals(readSegmentColumn("dim1", desc4)) + && ImmutableList.of("e").equals(readSegmentColumn("dim1", desc5)))); + Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc6)); + Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7)); } @Test(timeout = 60_000L) @@ -693,11 +697,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception intermediateHandoffPeriod = new Period().withSeconds(0); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records.subList(0, 2)) { - kafkaProducer.send(record).get(); - } - } + insertData(); Map consumerProps = kafkaServer.consumerProperties(); consumerProps.put("max.poll.records", "1"); @@ -715,8 +715,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception consumerProps, true, null, - null, - false + null ) ); final ListenableFuture future = runTask(task); @@ -774,8 +773,7 @@ public void testRunWithMinimumMessageTime() throws Exception kafkaServer.consumerProperties(), true, DateTimes.of("2010"), - null, - false + null ) ); @@ -787,11 +785,7 @@ public void testRunWithMinimumMessageTime() throws Exception } // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -828,8 +822,7 @@ public void testRunWithMaximumMessageTime() throws Exception kafkaServer.consumerProperties(), true, null, - DateTimes.of("2010"), - false + DateTimes.of("2010") ) ); @@ -841,11 +834,7 @@ public void testRunWithMaximumMessageTime() throws Exception } // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -892,8 +881,7 @@ public void testRunWithTransformSpec() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -905,11 +893,7 @@ public void testRunWithTransformSpec() throws Exception } // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -936,11 +920,7 @@ public void testRunWithTransformSpec() throws Exception public void testRunOnNothing() throws Exception { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -952,8 +932,7 @@ public void testRunOnNothing() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -977,11 +956,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception handoffConditionTimeout = 5_000; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -993,8 +968,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1029,11 +1003,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio handoffConditionTimeout = 100; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -1045,8 +1015,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1084,11 +1053,7 @@ public void testReportParseExceptions() throws Exception maxSavedParseExceptions = 2; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -1100,8 +1065,7 @@ public void testReportParseExceptions() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1128,11 +1092,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception maxSavedParseExceptions = 6; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -1144,8 +1104,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1210,11 +1169,7 @@ public void testMultipleParseExceptionsFailure() throws Exception maxSavedParseExceptions = 2; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -1226,8 +1181,7 @@ public void testMultipleParseExceptionsFailure() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1286,8 +1240,7 @@ public void testRunReplicas() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); final KafkaIndexTask task2 = createTask( @@ -1300,8 +1253,7 @@ public void testRunReplicas() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1309,11 +1261,7 @@ public void testRunReplicas() throws Exception final ListenableFuture future2 = runTask(task2); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for tasks to exit Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode()); @@ -1354,8 +1302,7 @@ public void testRunConflicting() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); final KafkaIndexTask task2 = createTask( @@ -1368,17 +1315,12 @@ public void testRunConflicting() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Run first task final ListenableFuture future1 = runTask(task1); @@ -1423,8 +1365,7 @@ public void testRunConflictingWithoutTransactions() throws Exception kafkaServer.consumerProperties(), false, null, - null, - false + null ) ); final KafkaIndexTask task2 = createTask( @@ -1437,17 +1378,12 @@ public void testRunConflictingWithoutTransactions() throws Exception kafkaServer.consumerProperties(), false, null, - null, - false + null ) ); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Run first task final ListenableFuture future1 = runTask(task1); @@ -1497,20 +1433,14 @@ public void testRunOneTaskTwoPartitions() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); final ListenableFuture future = runTask(task); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - kafkaProducer.flush(); - } + insertData(); // Wait for tasks to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -1562,8 +1492,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); final KafkaIndexTask task2 = createTask( @@ -1576,8 +1505,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1585,11 +1513,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception final ListenableFuture future2 = runTask(task2); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for tasks to exit Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode()); @@ -1628,12 +1552,11 @@ public void testRestore() throws Exception 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), - new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), + new KafkaPartitions(topic, ImmutableMap.of(0, 6L)), kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1641,9 +1564,12 @@ public void testRestore() throws Exception // Insert some data, but not enough for the task to finish try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (ProducerRecord record : Iterables.limit(records, 4)) { kafkaProducer.send(record).get(); } + kafkaProducer.commitTransaction(); } while (countEvents(task1) != 2) { @@ -1665,12 +1591,11 @@ public void testRestore() throws Exception 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), - new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), + new KafkaPartitions(topic, ImmutableMap.of(0, 6L)), kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1678,9 +1603,12 @@ public void testRestore() throws Exception // Insert remaining data try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (ProducerRecord record : Iterables.skip(records, 4)) { kafkaProducer.send(record).get(); } + kafkaProducer.commitTransaction(); } // Wait for task to exit @@ -1699,7 +1627,7 @@ public void testRestore() throws Exception SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals( - new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 6L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) ); @@ -1708,7 +1636,7 @@ public void testRestore() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } - @Test(timeout = 60_000L) + @Test(timeout = 60_0000L) public void testRunWithPauseAndResume() throws Exception { final KafkaIndexTask task = createTask( @@ -1717,12 +1645,11 @@ public void testRunWithPauseAndResume() throws Exception 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), - new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), + new KafkaPartitions(topic, ImmutableMap.of(0, 6L)), kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1730,10 +1657,13 @@ public void testRunWithPauseAndResume() throws Exception // Insert some data, but not enough for the task to finish try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (ProducerRecord record : Iterables.limit(records, 4)) { kafkaProducer.send(record).get(); } kafkaProducer.flush(); + kafkaProducer.commitTransaction(); } while (countEvents(task) != 2) { @@ -1750,12 +1680,14 @@ public void testRunWithPauseAndResume() throws Exception } ); Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus()); - // Insert remaining data try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (ProducerRecord record : Iterables.skip(records, 4)) { kafkaProducer.send(record).get(); } + kafkaProducer.commitTransaction(); } try { @@ -1783,7 +1715,7 @@ public void testRunWithPauseAndResume() throws Exception SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals( - new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 6L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) ); @@ -1805,8 +1737,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1828,11 +1759,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva { resetOffsetAutomatically = true; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -1844,8 +1771,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva kafkaServer.consumerProperties(), true, null, - null, - false + null ) ); @@ -1872,11 +1798,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception return; } // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final TreeMap> sequences = new TreeMap<>(); // Here the sequence number is 1 meaning that one incremental handoff was done by the failed task @@ -1898,8 +1820,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception kafkaServer.consumerProperties(), true, null, - null, - false + null ), context ); @@ -1928,6 +1849,133 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } + @Test(timeout = 60_000L) + public void testRunTransactionModeRollback() throws Exception + { + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + 0, + "sequence0", + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, 13L)), + kafkaServer.consumerProperties(), + true, + null, + null + ) + ); + + final ListenableFuture future = runTask(task); + + // Insert 2 records initially + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.limit(records, 2)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + + while (countEvents(task) != 2) { + Thread.sleep(25); + } + + Assert.assertEquals(2, countEvents(task)); + Assert.assertEquals(KafkaIndexTask.Status.READING, task.getRunner().getStatus()); + + //verify the 2 indexed records + final QuerySegmentSpec firstInterval = objectMapper.readValue( + "\"2008/2010\"", QuerySegmentSpec.class + ); + Iterable scanResultValues = scanData(task, firstInterval); + Assert.assertEquals(2, Iterables.size(scanResultValues)); + + // Insert 3 more records and rollback + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.limit(Iterables.skip(records, 2), 3)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.flush(); + kafkaProducer.abortTransaction(); + } + + Assert.assertEquals(2, countEvents(task)); + Assert.assertEquals(KafkaIndexTask.Status.READING, task.getRunner().getStatus()); + + final QuerySegmentSpec rollbackedInterval = objectMapper.readValue( + "\"2010/2012\"", QuerySegmentSpec.class + ); + scanResultValues = scanData(task, rollbackedInterval); + //verify that there are no records indexed in the rollbacked time period + Assert.assertEquals(0, Iterables.size(scanResultValues)); + + // Insert remaining data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.skip(records, 5)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + + final QuerySegmentSpec endInterval = objectMapper.readValue( + "\"2012/2050\"", QuerySegmentSpec.class + ); + //Iterable scanResultValues1 = scanData(task, endInterval); + //Assert.assertEquals(2, Iterables.size(scanResultValues1)); + + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets()); + + // Check metrics + Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2009/P1D", 0); + SegmentDescriptor desc3 = SD(task, "2013/P1D", 0); + SegmentDescriptor desc4 = SD(task, "2049/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 13L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2)); + Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc3)); + Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc4)); + } + + private List scanData(final Task task, QuerySegmentSpec spec) + { + ScanQuery query = new ScanQuery.ScanQueryBuilder().dataSource( + DATA_SCHEMA.getDataSource()).intervals(spec).build(); + List results = + task.getQueryRunner(query).run(wrap(query), new HashMap<>()).toList(); + return results; + } + + private void insertData() throws ExecutionException, InterruptedException + { + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + } + private ListenableFuture runTask(final Task task) { try { @@ -2093,7 +2141,7 @@ private static DataSchema cloneDataSchema(final DataSchema dataSchema) ); } - private QueryRunnerFactoryConglomerate makeTimeseriesOnlyConglomerate() + private QueryRunnerFactoryConglomerate makeTimeseriesAndScanConglomerate() { IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator( null, @@ -2110,16 +2158,33 @@ public QueryRunner decorate( } }; return new DefaultQueryRunnerFactoryConglomerate( - ImmutableMap.of( - TimeseriesQuery.class, - new TimeseriesQueryRunnerFactory( - new TimeseriesQueryQueryToolChest(queryRunnerDecorator), - new TimeseriesQueryEngine(), - (query, future) -> { - // do nothing - } + ImmutableMap., QueryRunnerFactory>builder() + .put( + TimeseriesQuery.class, + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(queryRunnerDecorator), + new TimeseriesQueryEngine(), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + // do nothing + } + } + ) ) - ) + .put( + ScanQuery.class, + new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper()) + ), + new ScanQueryEngine() + ) + ) + .build() ); } @@ -2248,7 +2313,7 @@ public List getLocations() new TestDataSegmentAnnouncer(), EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, - this::makeTimeseriesOnlyConglomerate, + this::makeTimeseriesAndScanConglomerate, MoreExecutors.sameThreadExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), new SegmentLoaderFactory( diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index a7dc2041b8a4..350ba74f60a4 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -77,7 +77,6 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(Duration.standardMinutes(30), config.getCompletionTimeout()); Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent()); Assert.assertFalse("earlyMessageRejectionPeriod", config.getEarlyMessageRejectionPeriod().isPresent()); - Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps()); } @Test @@ -95,8 +94,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"useEarliestOffset\": true,\n" + " \"completionTimeout\": \"PT45M\",\n" + " \"lateMessageRejectionPeriod\": \"PT1H\",\n" - + " \"earlyMessageRejectionPeriod\": \"PT1H\",\n" - + " \"skipOffsetGaps\": true\n" + + " \"earlyMessageRejectionPeriod\": \"PT1H\"\n" + "}"; KafkaSupervisorIOConfig config = mapper.readValue( @@ -119,7 +117,6 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(Duration.standardMinutes(45), config.getCompletionTimeout()); Assert.assertEquals(Duration.standardHours(1), config.getLateMessageRejectionPeriod().get()); Assert.assertEquals(Duration.standardHours(1), config.getEarlyMessageRejectionPeriod().get()); - Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index c4e24f185cc9..7a9c756593f4 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -243,7 +243,7 @@ public static void tearDownClass() throws IOException @Test public void testNoInitialState() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(); @@ -274,7 +274,6 @@ public void testNoInitialState() throws Exception Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent()); - Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic()); Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0)); @@ -290,7 +289,7 @@ public void testNoInitialState() throws Exception @Test public void testSkipOffsetGaps() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(); @@ -313,13 +312,12 @@ public void testSkipOffsetGaps() throws Exception KafkaIndexTask task = captured.getValue(); KafkaIOConfig taskConfig = task.getIOConfig(); - Assert.assertTrue("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); } @Test public void testMultiTask() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -356,7 +354,7 @@ public void testMultiTask() throws Exception @Test public void testReplicas() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -393,7 +391,7 @@ public void testReplicas() throws Exception @Test public void testLateMessageRejectionPeriod() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -432,7 +430,7 @@ public void testLateMessageRejectionPeriod() throws Exception @Test public void testEarlyMessageRejectionPeriod() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"), false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H")); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -474,7 +472,7 @@ public void testEarlyMessageRejectionPeriod() throws Exception */ public void testLatestOffset() throws Exception { - supervisor = getSupervisor(1, 1, false, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, false, "PT1H", null, null); addSomeEvents(1100); Capture captured = Capture.newInstance(); @@ -494,9 +492,9 @@ public void testLatestOffset() throws Exception verifyAll(); KafkaIndexTask task = captured.getValue(); - Assert.assertEquals(1100L, (long) task.getIOConfig().getStartPartitions().getPartitionOffsetMap().get(0)); - Assert.assertEquals(1100L, (long) task.getIOConfig().getStartPartitions().getPartitionOffsetMap().get(1)); - Assert.assertEquals(1100L, (long) task.getIOConfig().getStartPartitions().getPartitionOffsetMap().get(2)); + Assert.assertEquals(1101L, (long) task.getIOConfig().getStartPartitions().getPartitionOffsetMap().get(0)); + Assert.assertEquals(1101L, (long) task.getIOConfig().getStartPartitions().getPartitionOffsetMap().get(1)); + Assert.assertEquals(1101L, (long) task.getIOConfig().getStartPartitions().getPartitionOffsetMap().get(2)); } @Test @@ -506,7 +504,7 @@ public void testLatestOffset() throws Exception */ public void testDatasourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(); @@ -536,7 +534,7 @@ public void testDatasourceMetadata() throws Exception @Test(expected = ISE.class) public void testBadMetadataOffsets() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); @@ -555,7 +553,7 @@ public void testBadMetadataOffsets() throws Exception @Test public void testKillIncompatibleTasks() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); // unexpected # of partitions (kill) @@ -657,7 +655,7 @@ public void testKillIncompatibleTasks() throws Exception @Test public void testKillBadPartitionAssignment() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 2, true, "PT1H", null, null); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -758,7 +756,7 @@ public void testKillBadPartitionAssignment() throws Exception @Test public void testRequeueTaskWhenFailed() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -839,7 +837,7 @@ public void testRequeueTaskWhenFailed() throws Exception @Test public void testRequeueAdoptedTaskWhenFailed() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); DateTime now = DateTimes.nowUtc(); @@ -937,7 +935,7 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception @Test public void testQueueNextTasksOnSuccess() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1027,7 +1025,7 @@ public void testBeginPublishAndQueueNextTasks() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1M", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1120,7 +1118,7 @@ public void testDiscoverExistingPublishingTask() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Task task = createKafkaIndexTask( @@ -1212,7 +1210,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Task task = createKafkaIndexTask( @@ -1302,7 +1300,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(6); Task id1 = createKafkaIndexTask( @@ -1387,23 +1385,23 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception Assert.assertEquals(startTime, activeReport.getStartTime()); Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L), activeReport.getStartingOffsets()); Assert.assertEquals(ImmutableMap.of(0, 4L, 1, 5L, 2, 6L), activeReport.getCurrentOffsets()); - Assert.assertEquals(ImmutableMap.of(0, 2L, 1, 1L, 2, 0L), activeReport.getLag()); + Assert.assertEquals(ImmutableMap.of(0, 3L, 1, 2L, 2, 1L), activeReport.getLag()); Assert.assertEquals("id1", publishingReport.getId()); Assert.assertEquals(ImmutableMap.of(0, 0L, 1, 0L, 2, 0L), publishingReport.getStartingOffsets()); Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L), publishingReport.getCurrentOffsets()); Assert.assertEquals(null, publishingReport.getLag()); - Assert.assertEquals(ImmutableMap.of(0, 6L, 1, 6L, 2, 6L), payload.getLatestOffsets()); - Assert.assertEquals(ImmutableMap.of(0, 2L, 1, 1L, 2, 0L), payload.getMinimumLag()); - Assert.assertEquals(3L, (long) payload.getAggregateLag()); + Assert.assertEquals(ImmutableMap.of(0, 7L, 1, 7L, 2, 7L), payload.getLatestOffsets()); + Assert.assertEquals(ImmutableMap.of(0, 3L, 1, 2L, 2, 1L), payload.getMinimumLag()); + Assert.assertEquals(6L, (long) payload.getAggregateLag()); Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow()); } @Test public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1460,7 +1458,7 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1M", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1539,7 +1537,7 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1M", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1624,7 +1622,7 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception @Test(expected = IllegalStateException.class) public void testStopNotStarted() { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); supervisor.stop(false); } @@ -1636,7 +1634,7 @@ public void testStop() taskRunner.unregisterListener(StringUtils.format("KafkaSupervisor-%s", DATASOURCE)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); supervisor.start(); supervisor.stop(false); @@ -1650,7 +1648,7 @@ public void testStopGracefully() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -1751,7 +1749,7 @@ public void testResetNoTasks() throws Exception taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); supervisor.start(); supervisor.runInternal(); verifyAll(); @@ -1768,7 +1766,7 @@ public void testResetNoTasks() throws Exception @Test public void testResetDataSourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); @@ -1823,7 +1821,7 @@ public void testResetDataSourceMetadata() throws Exception @Test public void testResetNoDataSourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); @@ -1856,7 +1854,7 @@ public void testResetRunningTasks() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -1945,7 +1943,7 @@ public void testResetRunningTasks() throws Exception public void testNoDataIngestionTasks() throws Exception { final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1S", null, null); //not adding any events Task id1 = createKafkaIndexTask( "id1", @@ -2032,7 +2030,7 @@ public void testNoDataIngestionTasks() throws Exception public void testCheckpointForInactiveTaskGroup() throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException { - supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1S", null, null); //not adding any events final Task id1 = createKafkaIndexTask( "id1", @@ -2134,7 +2132,7 @@ public void testCheckpointForInactiveTaskGroup() @Test(timeout = 60_000L) public void testCheckpointForUnknownTaskGroup() throws InterruptedException { - supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1S", null, null); //not adding any events final Task id1 = createKafkaIndexTask( "id1", @@ -2212,7 +2210,7 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException public void testCheckpointWithNullTaskGroupId() throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException { - supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false); + supervisor = getSupervisor(1, 3, true, "PT1S", null, null); //not adding any events final Task id1 = createKafkaIndexTask( "id1", @@ -2298,7 +2296,7 @@ public void testCheckpointWithNullTaskGroupId() @Test public void testSuspendedNoRunningTasks() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true, kafkaHost); addSomeEvents(1); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -2331,7 +2329,7 @@ public void testSuspendedRunningTasks() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false, true, kafkaHost); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null, true, kafkaHost); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -2425,7 +2423,7 @@ public void testResetSuspended() throws Exception taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true, kafkaHost); supervisor.start(); supervisor.runInternal(); verifyAll(); @@ -2450,7 +2448,6 @@ public void testFailedInitializationAndRecovery() throws Exception null, null, false, - false, StringUtils.format("badhostname:%d", kafkaServer.getPort()) ); addSomeEvents(1); @@ -2516,7 +2513,6 @@ public void testFailedInitializationAndRecovery() throws Exception Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent()); - Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic()); Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0)); @@ -2533,6 +2529,8 @@ public void testFailedInitializationAndRecovery() throws Exception private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (int i = 0; i < NUM_PARTITIONS; i++) { for (int j = 0; j < numEventsPerPartition; j++) { kafkaProducer.send( @@ -2545,6 +2543,7 @@ private void addSomeEvents(int numEventsPerPartition) throws Exception ).get(); } } + kafkaProducer.commitTransaction(); } } @@ -2554,8 +2553,7 @@ private KafkaSupervisor getSupervisor( boolean useEarliestOffset, String duration, Period lateMessageRejectionPeriod, - Period earlyMessageRejectionPeriod, - boolean skipOffsetGaps + Period earlyMessageRejectionPeriod ) { return getSupervisor( @@ -2565,7 +2563,6 @@ private KafkaSupervisor getSupervisor( duration, lateMessageRejectionPeriod, earlyMessageRejectionPeriod, - skipOffsetGaps, false, kafkaHost ); @@ -2578,7 +2575,6 @@ private KafkaSupervisor getSupervisor( String duration, Period lateMessageRejectionPeriod, Period earlyMessageRejectionPeriod, - boolean skipOffsetGaps, boolean suspended, String kafkaHost ) @@ -2586,6 +2582,7 @@ private KafkaSupervisor getSupervisor( Map consumerProperties = new HashMap<>(); consumerProperties.put("myCustomKey", "myCustomValue"); consumerProperties.put("bootstrap.servers", kafkaHost); + consumerProperties.put("isolation.level", "read_committed"); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, replicas, @@ -2597,8 +2594,7 @@ private KafkaSupervisor getSupervisor( useEarliestOffset, new Period("PT30M"), lateMessageRejectionPeriod, - earlyMessageRejectionPeriod, - skipOffsetGaps + earlyMessageRejectionPeriod ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( @@ -2705,8 +2701,7 @@ private KafkaIndexTask createKafkaIndexTask( ImmutableMap.of(), true, minimumMessageTime, - maximumMessageTime, - false + maximumMessageTime ), Collections.emptyMap(), null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java index c1a06716a3ce..45311800dc8d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java @@ -40,11 +40,12 @@ import java.io.IOException; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.ThreadLocalRandom; public class TestBroker implements Closeable { - + private static final Random RANDOM = ThreadLocalRandom.current(); private final String zookeeperConnect; private final File directory; private final boolean directoryCleanup; @@ -77,6 +78,9 @@ public void start() props.setProperty("broker.id", String.valueOf(id)); props.setProperty("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000)); props.setProperty("advertised.host.name", "localhost"); + props.setProperty("transaction.state.log.replication.factor", "1"); + props.setProperty("offsets.topic.replication.factor", "1"); + props.setProperty("transaction.state.log.min.isr", "1"); props.putAll(brokerProps); final KafkaConfig config = new KafkaConfig(props); @@ -107,6 +111,8 @@ public Map producerProperties() props.put("key.serializer", ByteArraySerializer.class.getName()); props.put("value.serializer", ByteArraySerializer.class.getName()); props.put("acks", "all"); + props.put("enable.idempotence", "true"); + props.put("transactional.id", String.valueOf(RANDOM.nextInt())); return props; } @@ -116,8 +122,9 @@ public Map consumerProperties() props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort())); props.put("key.deserializer", ByteArrayDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); - props.put("group.id", String.valueOf(ThreadLocalRandom.current().nextInt())); + props.put("group.id", String.valueOf(RANDOM.nextInt())); props.put("auto.offset.reset", "earliest"); + props.put("isolation.level", "read_committed"); return props; } diff --git a/pom.xml b/pom.xml index db0c2cd8fe7f..a8b4a5a68542 100644 --- a/pom.xml +++ b/pom.xml @@ -674,7 +674,7 @@ org.slf4j slf4j-api - 1.6.4 + 1.7.25 org.roaringbitmap From b584bf577058ea5336b46fdfc70bc8773b3164ee Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sat, 20 Oct 2018 15:16:31 -0700 Subject: [PATCH 02/16] Remove unused import from test --- .../java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 7901bcebc756..9426b877931d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -184,7 +184,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.druid.query.QueryPlus.wrap; From d2a132ffdf66db58c0fb4e1d3f87acc2352341a2 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 13 Nov 2018 13:21:02 -0800 Subject: [PATCH 03/16] Fix compilation --- .../org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 3 +-- .../druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 74bd746cacce..ab70b7904451 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -803,8 +803,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception consumerProps, true, null, - null, - false + null ) ); final ListenableFuture future = runTask(task); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 768ae908c759..593a95837f78 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2544,9 +2544,9 @@ public void testFailedInitializationAndRecovery() throws Exception } @Test - public void testGetCurrentTotalStats() throws Exception + public void testGetCurrentTotalStats() { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false, kafkaHost); supervisor.addTaskGroupToActivelyReadingTaskGroup( supervisor.getTaskGroupIdForPartition(0), ImmutableMap.of(0, 0L), From 0ce12e6ef630410aff8450b86fa9eab75eee6f29 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 15 Nov 2018 17:33:08 -0800 Subject: [PATCH 04/16] Invoke transaction api to fix a unit test --- .../org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index ab70b7904451..cb76e4ac6c99 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -782,9 +782,12 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception // Insert data try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (ProducerRecord record : records) { kafkaProducer.send(record).get(); } + kafkaProducer.commitTransaction(); } Map consumerProps = kafkaServer.consumerProperties(); consumerProps.put("max.poll.records", "1"); From 868ef80fc02eefe5bf46a9c7d82b40f99a6735d7 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 16 Nov 2018 14:04:27 -0800 Subject: [PATCH 05/16] temporary modification of travis.yml for debugging --- .travis.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.travis.yml b/.travis.yml index 1e17711f7060..e8ea06ce3445 100644 --- a/.travis.yml +++ b/.travis.yml @@ -122,6 +122,12 @@ matrix: script: - $TRAVIS_BUILD_DIR/ci/travis_script_integration.sh after_failure: + - for v in ~/shared/tasklogs/*.log ; do + if [ "$(grep -c "index_kafka_kafka" $v)" -gt 0 ]; then + echo $v failed-kafka-tasklog ======================== + cat $v + fi + done - for v in ~/shared/logs/*.log ; do echo $v logtail ======================== ; tail -100 $v ; done From c1a9ce52cb5bc4ec7308a29da0db5ee0cd0b4efb Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 16 Nov 2018 15:14:38 -0800 Subject: [PATCH 06/16] another attempt to get travis tasklogs --- .travis.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index e8ea06ce3445..30a54210e73c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -123,10 +123,9 @@ matrix: - $TRAVIS_BUILD_DIR/ci/travis_script_integration.sh after_failure: - for v in ~/shared/tasklogs/*.log ; do - if [ "$(grep -c "index_kafka_kafka" $v)" -gt 0 ]; then - echo $v failed-kafka-tasklog ======================== - cat $v - fi + if [ "$(grep -c "index_kafka_kafka" $v)" -gt 0 ]; then + echo $v failed-kafka-tasklog ======================== ; cat $v ; + fi done - for v in ~/shared/logs/*.log ; do echo $v logtail ======================== ; tail -100 $v ; From 39d946ddbe36ebd818aa5e3e79a3f798e96fce5c Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 16 Nov 2018 16:48:39 -0800 Subject: [PATCH 07/16] update kafka to 2.0.1 at all places --- extensions-core/kafka-indexing-service/pom.xml | 2 +- integration-tests/pom.xml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml index 100609230b71..db03900eeee3 100644 --- a/extensions-core/kafka-indexing-service/pom.xml +++ b/extensions-core/kafka-indexing-service/pom.xml @@ -34,7 +34,7 @@ - 2.0.0 + 2.0.1 diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 73390ab537f1..62f5432ecd01 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -32,7 +32,7 @@ - 0.10.2.0 + 2.0.1 @@ -126,7 +126,7 @@ org.apache.kafka - kafka_2.11 + kafka_2.12 ${apache.kafka.version} From 7cb28d3344e4269e4c08f52cf2ef3b729fa64ca3 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 19 Nov 2018 13:25:19 -0800 Subject: [PATCH 08/16] Remove druid-kafka-eight dependency from integration-tests, remove the kafka firehose test and deprecate kafka-eight classes --- .../firehose/kafka/KafkaEightDruidModule.java | 1 + .../kafka/KafkaEightFirehoseFactory.java | 2 + integration-tests/pom.xml | 11 - .../druid/tests/indexer/ITKafkaTest.java | 312 ------------------ .../resources/indexer/kafka_index_task.json | 68 ---- 5 files changed, 3 insertions(+), 391 deletions(-) delete mode 100644 integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java delete mode 100644 integration-tests/src/test/resources/indexer/kafka_index_task.json diff --git a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java index 4fe379a48830..f8a5ad7d3736 100644 --- a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java +++ b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java @@ -30,6 +30,7 @@ /** */ +@Deprecated public class KafkaEightDruidModule implements DruidModule { @Override diff --git a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java index 0d0ed5863dd2..bb38c050a14a 100644 --- a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java @@ -47,7 +47,9 @@ import java.util.Set; /** + * This class is deprecated and kafka-eight module should be removed completely */ +@Deprecated public class KafkaEightFirehoseFactory implements FirehoseFactory> { private static final Logger log = new Logger(KafkaEightFirehoseFactory.class); diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 62f5432ecd01..8679687f6550 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -51,17 +51,6 @@ druid-datasketches ${project.parent.version} - - org.apache.druid.extensions - druid-kafka-eight - ${project.parent.version} - - - kafka_2.10 - org.apache.kafka - - - org.apache.druid.extensions druid-histogram diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java deleted file mode 100644 index 3388efe28670..000000000000 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.tests.indexer; - -import com.google.common.base.Throwables; -import com.google.inject.Inject; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; -import org.apache.commons.io.IOUtils; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.testing.IntegrationTestingConfig; -import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.RetryUtil; -import org.apache.druid.testing.utils.TestQueryHelper; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.testng.annotations.AfterClass; -import org.testng.annotations.Guice; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Callable; - -/* - * This is a test for the kafka firehose. - */ -@Guice(moduleFactory = DruidTestModuleFactory.class) -public class ITKafkaTest extends AbstractIndexerTest -{ - private static final Logger LOG = new Logger(ITKafkaTest.class); - private static final int DELAY_BETWEEN_EVENTS_SECS = 5; - private static final String INDEXER_FILE = "/indexer/kafka_index_task.json"; - private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json"; - private static final String DATASOURCE = "kafka_test"; - private static final String TOPIC_NAME = "kafkaTopic"; - private static final int MINUTES_TO_SEND = 2; - public static final String testPropertyPrefix = "kafka.test.property."; - - - // We'll fill in the current time and numbers for added, deleted and changed - // before sending the event. - final String event_template = - "{\"timestamp\": \"%s\"," + - "\"page\": \"Gypsy Danger\"," + - "\"language\" : \"en\"," + - "\"user\" : \"nuclear\"," + - "\"unpatrolled\" : \"true\"," + - "\"newPage\" : \"true\"," + - "\"robot\": \"false\"," + - "\"anonymous\": \"false\"," + - "\"namespace\":\"article\"," + - "\"continent\":\"North America\"," + - "\"country\":\"United States\"," + - "\"region\":\"Bay Area\"," + - "\"city\":\"San Francisco\"," + - "\"added\":%d," + - "\"deleted\":%d," + - "\"delta\":%d}"; - - private String taskID; - private ZkClient zkClient; - private ZkUtils zkUtils; - private boolean segmentsExist; // to tell if we should remove segments during teardown - - // format for the querying interval - private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'"); - // format for the expected timestamp in a query response - private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'"); - private DateTime dtFirst; // timestamp of 1st event - private DateTime dtLast; // timestamp of last event - - @Inject - private TestQueryHelper queryHelper; - @Inject - private IntegrationTestingConfig config; - - @Test - public void testKafka() - { - LOG.info("Starting test: ITKafkaTest"); - - // create topic - try { - int sessionTimeoutMs = 10000; - int connectionTimeoutMs = 10000; - String zkHosts = config.getZookeeperHosts(); - zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); - zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false); - if (config.manageKafkaTopic()) { - int numPartitions = 1; - int replicationFactor = 1; - Properties topicConfig = new Properties(); - // addFilteredProperties(topicConfig); - AdminUtils.createTopic( - zkUtils, - TOPIC_NAME, - numPartitions, - replicationFactor, - topicConfig, - RackAwareMode.Disabled$.MODULE$ - ); - } - } - catch (Exception e) { - throw new ISE(e, "could not create kafka topic"); - } - - // set up kafka producer - Properties properties = new Properties(); - addFilteredProperties(properties); - properties.put("bootstrap.servers", config.getKafkaHost()); - LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost()); - properties.put("acks", "all"); - properties.put("retries", "3"); - - KafkaProducer producer = new KafkaProducer<>( - properties, - new StringSerializer(), - new StringSerializer() - ); - - DateTimeZone zone = DateTimes.inferTzfromString("UTC"); - // format for putting into events - DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"); - - DateTime dt = new DateTime(zone); // timestamp to put on events - dtFirst = dt; // timestamp of 1st event - dtLast = dt; // timestamp of last event - - // these are used to compute the expected aggregations - int added = 0; - int num_events = 10; - - // send data to kafka - for (int i = 0; i < num_events; i++) { - added += i; - // construct the event to send - String event = StringUtils.format( - event_template, - event_fmt.print(dt), i, 0, i - ); - LOG.info("sending event: [%s]", event); - try { - // Send event to kafka - producer.send(new ProducerRecord(TOPIC_NAME, event)).get(); - } - catch (Exception ioe) { - throw Throwables.propagate(ioe); - } - - dtLast = dt; - dt = new DateTime(zone); - } - - producer.close(); - - String indexerSpec; - - // replace temp strings in indexer file - try { - LOG.info("indexerFile name: [%s]", INDEXER_FILE); - - Properties consumerProperties = new Properties(); - consumerProperties.put("zookeeper.connect", config.getZookeeperInternalHosts()); - consumerProperties.put("zookeeper.connection.timeout.ms", "15000"); - consumerProperties.put("zookeeper.sync.time.ms", "5000"); - consumerProperties.put("group.id", Long.toString(System.currentTimeMillis())); - consumerProperties.put("fetch.message.max.bytes", "1048586"); - consumerProperties.put("auto.offset.reset", "smallest"); - consumerProperties.put("auto.commit.enable", "false"); - - addFilteredProperties(consumerProperties); - - indexerSpec = getTaskAsString(INDEXER_FILE); - indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", DATASOURCE); - indexerSpec = StringUtils.replace(indexerSpec, "%%TOPIC%%", TOPIC_NAME); - indexerSpec = StringUtils.replace(indexerSpec, "%%COUNT%%", Integer.toString(num_events)); - String consumerPropertiesJson = jsonMapper.writeValueAsString(consumerProperties); - indexerSpec = StringUtils.replace(indexerSpec, "%%CONSUMER_PROPERTIES%%", consumerPropertiesJson); - - LOG.info("indexerFile: [%s]\n", indexerSpec); - } - catch (Exception e) { - // log here so the message will appear in the console output - LOG.error("could not read indexer file [%s]", INDEXER_FILE); - throw new ISE(e, "could not read indexer file [%s]", INDEXER_FILE); - } - - // start indexing task - taskID = indexer.submitTask(indexerSpec); - LOG.info("-------------SUBMITTED TASK"); - - // wait for the task to finish - indexer.waitUntilTaskCompletes(taskID, 10000, 60); - - // wait for segments to be handed off - try { - RetryUtil.retryUntil( - new Callable() - { - @Override - public Boolean call() - { - return coordinator.areSegmentsLoaded(DATASOURCE); - } - }, - true, - 10000, - 30, - "Real-time generated segments loaded" - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - LOG.info("segments are present"); - segmentsExist = true; - - // put the timestamps into the query structure - String queryResponseTemplate; - InputStream is = ITKafkaTest.class.getResourceAsStream(QUERIES_FILE); - if (null == is) { - throw new ISE("could not open query file: %s", QUERIES_FILE); - } - - try { - queryResponseTemplate = IOUtils.toString(is, "UTF-8"); - } - catch (IOException e) { - throw new ISE(e, "could not read query file: %s", QUERIES_FILE); - } - - String queryStr = queryResponseTemplate; - queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", DATASOURCE); - // time boundary - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)); - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst)); - // time series - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst)); - String queryEnd = INTERVAL_FMT.print(dtFirst.plusMinutes(MINUTES_TO_SEND + 2)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_END%%", queryEnd); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_ADDED%%", Integer.toString(added)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events)); - - // this query will probably be answered from the realtime task - try { - this.queryHelper.testQueriesFromString(queryStr, 2); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @AfterClass - public void afterClass() - { - LOG.info("teardown"); - if (config.manageKafkaTopic()) { - // delete kafka topic - AdminUtils.deleteTopic(zkUtils, TOPIC_NAME); - } - - // remove segments - if (segmentsExist) { - unloadAndKillData(DATASOURCE); - } - } - - public void addFilteredProperties(Properties properties) - { - for (Map.Entry entry : config.getProperties().entrySet()) { - if (entry.getKey().startsWith(testPropertyPrefix)) { - properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue()); - } - } - } -} - diff --git a/integration-tests/src/test/resources/indexer/kafka_index_task.json b/integration-tests/src/test/resources/indexer/kafka_index_task.json deleted file mode 100644 index 55e28c7c47a2..000000000000 --- a/integration-tests/src/test/resources/indexer/kafka_index_task.json +++ /dev/null @@ -1,68 +0,0 @@ -{ - "type" : "index_realtime", - "spec" : { - "dataSchema": { - "dataSource": "%%DATASOURCE%%", - "parser" : { - "type" : "string", - "parseSpec" : { - "format" : "json", - "timestampSpec" : { - "column" : "timestamp", - "format" : "auto" - }, - "dimensionsSpec" : { - "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"], - "dimensionExclusions" : [], - "spatialDimensions" : [] - } - } - }, - "metricsSpec": [ - { - "type": "count", - "name": "count" - }, - { - "type": "doubleSum", - "name": "added", - "fieldName": "added" - }, - { - "type": "doubleSum", - "name": "deleted", - "fieldName": "deleted" - }, - { - "type": "doubleSum", - "name": "delta", - "fieldName": "delta" - } - ], - "granularitySpec": { - "type" : "uniform", - "segmentGranularity": "MINUTE", - "queryGranularity": "NONE" - } - }, - "ioConfig" : { - "type" : "realtime", - "firehose": { - "type": "fixedCount", - "count": "%%COUNT%%", - "delegate": { - "type": "kafka-0.8", - "consumerProps": %%CONSUMER_PROPERTIES%%, - "feed": "%%TOPIC%%" - } - } - }, - "tuningConfig": { - "type" : "realtime", - "maxRowsInMemory": 500000, - "intermediatePersistPeriod": "PT3M", - "windowPeriod": "PT150S", - "basePersistDirectory": "/home/y/var/druid_state/kafka_test/realtime/basePersist" - } - } -} From 97cd9ef47b57a0a0c0315b0ec5d1fb277fe49438 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 19 Nov 2018 16:26:54 -0800 Subject: [PATCH 09/16] Add deprecated in docs for kafka-eight and kafka-simple extensions --- .travis.yml | 5 ----- docs/content/development/extensions.md | 4 ++-- .../kafka/KafkaEightSimpleConsumerDruidModule.java | 1 + .../kafka/KafkaEightSimpleConsumerFirehoseFactory.java | 1 + .../apache/druid/firehose/kafka/KafkaSimpleConsumer.java | 1 + .../apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 8 ++++---- 6 files changed, 9 insertions(+), 11 deletions(-) diff --git a/.travis.yml b/.travis.yml index 30a54210e73c..1e17711f7060 100644 --- a/.travis.yml +++ b/.travis.yml @@ -122,11 +122,6 @@ matrix: script: - $TRAVIS_BUILD_DIR/ci/travis_script_integration.sh after_failure: - - for v in ~/shared/tasklogs/*.log ; do - if [ "$(grep -c "index_kafka_kafka" $v)" -gt 0 ]; then - echo $v failed-kafka-tasklog ======================== ; cat $v ; - fi - done - for v in ~/shared/logs/*.log ; do echo $v logtail ======================== ; tail -100 $v ; done diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 998f3dcd778a..2f2e2d51e1ef 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -47,7 +47,7 @@ Core extensions are maintained by Druid committers. |druid-datasketches|Support for approximate counts and set operations with [DataSketches](http://datasketches.github.io/).|[link](../development/extensions-core/datasketches-extension.html)| |druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)| |druid-histogram|Approximate histograms and quantiles aggregator.|[link](../development/extensions-core/approximate-histograms.html)| -|druid-kafka-eight|Kafka ingest firehose (high level consumer) for realtime nodes.|[link](../development/extensions-core/kafka-eight-firehose.html)| +|druid-kafka-eight|Kafka ingest firehose (high level consumer) for realtime nodes(deprecated).|[link](../development/extensions-core/kafka-eight-firehose.html)| |druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)| |druid-kafka-indexing-service|Supervised exactly-once Kafka ingestion for the indexing service.|[link](../development/extensions-core/kafka-ingestion.html)| |druid-kerberos|Kerberos authentication for druid nodes.|[link](../development/extensions-core/druid-kerberos.html)| @@ -79,7 +79,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c |druid-cassandra-storage|Apache Cassandra deep storage.|[link](../development/extensions-contrib/cassandra.html)| |druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)| |druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)| -|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)| +|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer)(deprecated).|[link](../development/extensions-contrib/kafka-simple.html)| |druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-contrib/orc.html)| |druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)| |druid-redis-cache|A cache implementation for Druid based on Redis.|[link](../development/extensions-contrib/redis-cache.html)| diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java index a03f3204959b..8fad815177d1 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java @@ -28,6 +28,7 @@ import java.util.List; +@Deprecated public class KafkaEightSimpleConsumerDruidModule implements DruidModule { @Override diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java index a68150a0273b..5007a0e59d65 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +@Deprecated public class KafkaEightSimpleConsumerFirehoseFactory implements FirehoseFactoryV2 { diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java index 038fb2db90f4..25fc8de15daf 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java @@ -56,6 +56,7 @@ * This class is not thread safe, the caller must ensure all the methods be * called from single thread */ +@Deprecated public class KafkaSimpleConsumer { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index cb76e4ac6c99..396694f4d573 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1704,7 +1704,7 @@ public void testRestore() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } - @Test(timeout = 60_0000L) + @Test(timeout = 60_000L) public void testRunWithPauseAndResume() throws Exception { final KafkaIndexTask task = createTask( @@ -1992,10 +1992,10 @@ public void testRunTransactionModeRollback() throws Exception } final QuerySegmentSpec endInterval = objectMapper.readValue( - "\"2012/2050\"", QuerySegmentSpec.class + "\"2008/2049\"", QuerySegmentSpec.class ); - //Iterable scanResultValues1 = scanData(task, endInterval); - //Assert.assertEquals(2, Iterables.size(scanResultValues1)); + Iterable scanResultValues1 = scanData(task, endInterval); + Assert.assertEquals(2, Iterables.size(scanResultValues1)); Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets()); From 7b72c8df72b5d1288866a4b030a4aa2b634114c6 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 4 Jan 2019 17:09:16 -0800 Subject: [PATCH 10/16] Remove skipOffsetGaps and code changes for transaction support --- .../kafka/KafkaIndexTaskIOConfig.java | 5 +---- .../indexing/kafka/KafkaRecordSupplier.java | 3 ++- .../SeekableStreamIndexTask.java | 2 +- .../SeekableStreamIndexTaskRunner.java | 21 +++++-------------- .../common/OrderedPartitionableRecord.java | 2 +- .../seekablestream/common/RecordSupplier.java | 2 +- .../supervisor/SeekableStreamSupervisor.java | 2 +- 7 files changed, 12 insertions(+), 25 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java index fc5c28751bc9..846b284142a3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java @@ -42,8 +42,7 @@ public KafkaIndexTaskIOConfig( @JsonProperty("consumerProperties") Map consumerProperties, @JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, - @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, - @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps + @JsonProperty("maximumMessageTime") DateTime maximumMessageTime ) { super( @@ -54,7 +53,6 @@ public KafkaIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - skipOffsetGaps, null ); @@ -89,7 +87,6 @@ public String toString() ", useTransaction=" + isUseTransaction() + ", minimumMessageTime=" + getMinimumMessageTime() + ", maximumMessageTime=" + getMaximumMessageTime() + - ", skipOffsetGaps=" + isSkipOffsetGaps() + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 935404cbc7c2..6c3d053a3bb5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import javax.annotation.Nonnull; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -110,7 +111,7 @@ public Set> getAssignment() public List> poll(long timeout) { List> polledRecords = new ArrayList<>(); - for (ConsumerRecord record : consumer.poll(timeout)) { + for (ConsumerRecord record : consumer.poll(Duration.ofMillis(timeout))) { polledRecords.add(new OrderedPartitionableRecord<>( record.topic(), record.partition(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 7f279917a3f1..776579b43628 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -63,7 +63,7 @@ import java.util.concurrent.ThreadLocalRandom; -public abstract class SeekableStreamIndexTask extends AbstractTask +public abstract class SeekableStreamIndexTask extends AbstractTask implements ChatHandler { public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index e85de9d8db68..f47c8632c160 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -129,7 +129,7 @@ * @param Partition Number Type * @param Sequence Number Type */ -public abstract class SeekableStreamIndexTaskRunner implements ChatHandler +public abstract class SeekableStreamIndexTaskRunner implements ChatHandler { public enum Status { @@ -473,7 +473,7 @@ public void run() } // if stop is requested or task's end sequence is set by call to setEndOffsets method with finish set to true - if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) { + if (stopRequested.get() || sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) { status = Status.PUBLISHING; } @@ -503,10 +503,11 @@ public void run() SequenceMetadata sequenceToCheckpoint = null; for (OrderedPartitionableRecord record : records) { + // for Kafka, the end offsets are exclusive, so skip it if (isEndSequenceOffsetsExclusive() && createSequenceNumber(record.getSequenceNumber()).compareTo( - createSequenceNumber(endOffsets.get(record.getPartitionId()))) == 0) { + createSequenceNumber(endOffsets.get(record.getPartitionId()))) >= 0) { continue; } @@ -529,17 +530,6 @@ public void run() } else if (createSequenceNumber(record.getSequenceNumber()).compareTo( createSequenceNumber(endOffsets.get(record.getPartitionId()))) <= 0) { - - if (!record.getSequenceNumber().equals(currOffsets.get(record.getPartitionId())) - && !ioConfig.isSkipOffsetGaps()) { - throw new ISE( - "WTF?! Got sequence[%s] after sequence[%s] in partition[%s].", - record.getSequenceNumber(), - currOffsets.get(record.getPartitionId()), - record.getPartitionId() - ); - } - try { final List valueBytess = record.getData(); final List rows; @@ -633,7 +623,6 @@ public void onFailure(Throwable t) // here for kafka is to +1 while for kinesis we simply save the current sequence number currOffsets.put(record.getPartitionId(), getSequenceNumberToStoreAfterRead(record.getSequenceNumber())); } - if ((currOffsets.get(record.getPartitionId()).equals(endOffsets.get(record.getPartitionId())) || isEndOfShard(currOffsets.get(record.getPartitionId()))) && assignment.remove(record.getStreamPartition())) { @@ -1903,7 +1892,7 @@ private boolean verifyInitialRecordAndSkipExclusivePartition( ) { if (intialSequenceSnapshot.containsKey(record.getPartitionId())) { - if (!intialSequenceSnapshot.get(record.getPartitionId()).equals(record.getSequenceNumber())) { + if (record.getSequenceNumber().compareTo(intialSequenceSnapshot.get(record.getPartitionId())) < 0) { throw new ISE( "Starting sequenceNumber [%s] does not match expected [%s] for partition [%s]", record.getSequenceNumber(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java index 4dd653e760ff..a122d9e15213 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java @@ -35,7 +35,7 @@ * @param partition id * @param sequence number */ -public class OrderedPartitionableRecord +public class OrderedPartitionableRecord { private final String stream; private final PartitionIdType partitionId; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java index d9e599da0c80..3a6e87ed3b24 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java @@ -36,7 +36,7 @@ * @param Sequence Number Type */ @Beta -public interface RecordSupplier extends Closeable +public interface RecordSupplier extends Closeable { /** * assigns the given partitions to this RecordSupplier diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ddd854d3bdad..0629aa879b4b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -119,7 +119,7 @@ * @param the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type * @param the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers */ -public abstract class SeekableStreamSupervisor +public abstract class SeekableStreamSupervisor implements Supervisor { public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED"; From d1f06f029e919c39a58b982ef7ca7240432b09d7 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 4 Jan 2019 17:29:22 -0800 Subject: [PATCH 11/16] Fix indentation --- .../indexing/kafka/KafkaIndexTaskTest.java | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 47458f3431f2..4d378cbf20e0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -651,34 +651,34 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception ) ); - final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( - topic, - ImmutableMap.of( - 0, - 10L, - 1, - 2L - ) - ); - final KafkaIndexTask task = createTask( - null, - new KafkaIndexTaskIOConfig( - 0, - baseSequenceName, - startPartitions, - endPartitions, - consumerProps, - KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, - true, - null, - null - ) - ); - final ListenableFuture future = runTask(task); - while (task.getRunner().getStatus() != Status.PAUSED) { - Thread.sleep(10); - } - final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of( + 0, + 10L, + 1, + 2L + ) + ); + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + baseSequenceName, + startPartitions, + endPartitions, + consumerProps, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + final ListenableFuture future = runTask(task); + while (task.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); Assert.assertTrue(checkpoint1.getPartitionSequenceNumberMap().equals(currentOffsets)); task.getRunner().setEndOffsets(currentOffsets, false); From c89ddfb8a399ff33725ab48f42cd6a499248a09f Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 4 Jan 2019 21:49:03 -0800 Subject: [PATCH 12/16] remove skipOffsetGaps from kinesis --- .../apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java | 1 - .../org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index 307e971bcfd3..8dd32f246342 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -68,7 +68,6 @@ public KinesisIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - true, exclusiveStartSequenceNumberPartitions ); Preconditions.checkArgument(endPartitions.getPartitionSequenceNumberMap() diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java index fff34f946b26..ab782299ee79 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java @@ -89,7 +89,6 @@ public void testSerdeWithDefaults() throws Exception Assert.assertNull(config.getAwsAssumedRoleArn()); Assert.assertNull(config.getAwsExternalId()); Assert.assertFalse(config.isDeaggregate()); - Assert.assertTrue(config.isSkipOffsetGaps()); } @Test @@ -146,7 +145,6 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals("role", config.getAwsAssumedRoleArn()); Assert.assertEquals("awsexternalid", config.getAwsExternalId()); Assert.assertTrue(config.isDeaggregate()); - Assert.assertTrue(config.isSkipOffsetGaps()); } @Test From d6d5b3239021e00fdb9df50d963880033a9ff92e Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sat, 5 Jan 2019 20:51:13 -0800 Subject: [PATCH 13/16] Add transaction api to KafkaRecordSupplierTest --- .../kafka/KafkaRecordSupplierTest.java | 63 +++++++++---------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index 2f445aa99431..105dca4e0d60 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -166,11 +166,7 @@ public void testSupplierSetup() throws ExecutionException, InterruptedException { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); Set> partitions = ImmutableSet.of( StreamPartition.of(topic, 0), @@ -195,11 +191,7 @@ public void testPoll() throws InterruptedException, ExecutionException { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); Set> partitions = ImmutableSet.of( StreamPartition.of(topic, 0), @@ -232,10 +224,13 @@ public void testPoll() throws InterruptedException, ExecutionException public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionException { // Insert data - - KafkaProducer producer = kafkaServer.newProducer(); - for (ProducerRecord record : records.subList(0, 13)) { - producer.send(record).get(); + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : records.subList(0, 13)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); } Set> partitions = ImmutableSet.of( @@ -257,8 +252,13 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE } // Insert data - for (ProducerRecord rec : records.subList(13, 15)) { - producer.send(rec).get(); + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : records.subList(13, 15)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); } @@ -280,11 +280,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE public void testSeek() throws InterruptedException, ExecutionException { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); StreamPartition partition0 = StreamPartition.of(topic, 0); StreamPartition partition1 = StreamPartition.of(topic, 1); @@ -326,11 +322,7 @@ public void testSeek() throws InterruptedException, ExecutionException public void testSeekToLatest() throws InterruptedException, ExecutionException { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); StreamPartition partition0 = StreamPartition.of(topic, 0); StreamPartition partition1 = StreamPartition.of(topic, 1); @@ -388,11 +380,7 @@ public void testSeekUnassigned() throws InterruptedException, ExecutionException public void testPosition() throws ExecutionException, InterruptedException { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); StreamPartition partition0 = StreamPartition.of(topic, 0); StreamPartition partition1 = StreamPartition.of(topic, 1); @@ -420,7 +408,7 @@ public void testPosition() throws ExecutionException, InterruptedException Assert.assertEquals(0L, (long) recordSupplier.getPosition(partition0)); recordSupplier.seekToLatest(Collections.singleton(partition0)); - Assert.assertEquals(11L, (long) recordSupplier.getPosition(partition0)); + Assert.assertEquals(12L, (long) recordSupplier.getPosition(partition0)); long prevPos = recordSupplier.getPosition(partition0); recordSupplier.getEarliestSequenceNumber(partition0); @@ -433,5 +421,16 @@ public void testPosition() throws ExecutionException, InterruptedException recordSupplier.close(); } + private void insertData() throws ExecutionException, InterruptedException + { + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + } } From e411d68e85be4cedcd81d4be8f4c31242410d3bb Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sat, 5 Jan 2019 22:00:56 -0800 Subject: [PATCH 14/16] Fix indent --- .../apache/druid/indexing/kafka/KafkaRecordSupplierTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index 105dca4e0d60..045d8df47986 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -280,7 +280,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE public void testSeek() throws InterruptedException, ExecutionException { // Insert data - insertData(); + insertData(); StreamPartition partition0 = StreamPartition.of(topic, 0); StreamPartition partition1 = StreamPartition.of(topic, 1); From 9445d928d4fa900b3d7d3ac8da4886d96bca95d8 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 4 Feb 2019 14:16:17 -0800 Subject: [PATCH 15/16] Fix test --- .../kafka/KafkaRecordSupplierTest.java | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index 045d8df47986..f944bf04610b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -270,8 +270,28 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE List> initialRecords = createOrderedPartitionableRecords(); Assert.assertEquals(records.size(), polledRecords.size()); - Assert.assertTrue(initialRecords.containsAll(polledRecords)); + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + final int initialRecordsPartition0Size = initialRecords.stream() + .filter(r -> r.getPartitionId().equals(0)) + .collect(Collectors.toSet()) + .size(); + final int initialRecordsPartition1Size = initialRecords.stream() + .filter(r -> r.getPartitionId().equals(1)) + .collect(Collectors.toSet()) + .size(); + + final int polledRecordsPartition0Size = polledRecords.stream() + .filter(r -> r.getPartitionId().equals(0)) + .collect(Collectors.toSet()) + .size(); + final int polledRecordsPartition1Size = polledRecords.stream() + .filter(r -> r.getPartitionId().equals(1)) + .collect(Collectors.toSet()) + .size(); + + Assert.assertEquals(initialRecordsPartition0Size, polledRecordsPartition0Size); + Assert.assertEquals(initialRecordsPartition1Size, polledRecordsPartition1Size); recordSupplier.close(); } From 581a33a6058fcc5383537c325c0a26267882099b Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 4 Feb 2019 14:41:20 -0800 Subject: [PATCH 16/16] update kafka version to 2.1.0 --- extensions-core/kafka-indexing-service/pom.xml | 2 +- .../indexing/seekablestream/SeekableStreamIndexTaskRunner.java | 1 + integration-tests/pom.xml | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml index c8170d47c797..cd99c673a3d3 100644 --- a/extensions-core/kafka-indexing-service/pom.xml +++ b/extensions-core/kafka-indexing-service/pom.xml @@ -34,7 +34,7 @@ - 2.0.1 + 2.1.0 diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 348fbdfebd15..8fa6cde137f4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -624,6 +624,7 @@ public void onFailure(Throwable t) // here for kafka is to +1 while for kinesis we simply save the current sequence number currOffsets.put(record.getPartitionId(), getSequenceNumberToStoreAfterRead(record.getSequenceNumber())); } + if ((currOffsets.get(record.getPartitionId()).equals(endOffsets.get(record.getPartitionId())) || isEndOfShard(currOffsets.get(record.getPartitionId()))) && assignment.remove(record.getStreamPartition())) { diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index f66ba496cda7..53e01d2a796b 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -32,7 +32,7 @@ - 2.0.1 + 2.1.0