diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index b24e8748538d..85cebc5ae277 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -201,7 +201,6 @@ For Roaring bitmaps: |`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)| |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)| |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)| -|`skipOffsetGaps`|Boolean|Whether or not to allow gaps of missing offsets in the Kafka stream. This is required for compatibility with implementations such as MapR Streams which does not guarantee consecutive offsets. If this is false, an exception will be thrown if offsets are not consecutive.|no (default == false)| ## Operations diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index a5b8a5b3f57a..a6d3a7b3913d 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -48,7 +48,7 @@ Core extensions are maintained by Druid committers. |druid-datasketches|Support for approximate counts and set operations with [DataSketches](http://datasketches.github.io/).|[link](../development/extensions-core/datasketches-extension.html)| |druid-hdfs-storage|HDFS deep storage.|[link](../development/extensions-core/hdfs.html)| |druid-histogram|Approximate histograms and quantiles aggregator.|[link](../development/extensions-core/approximate-histograms.html)| -|druid-kafka-eight|Kafka ingest firehose (high level consumer) for realtime nodes.|[link](../development/extensions-core/kafka-eight-firehose.html)| +|druid-kafka-eight|Kafka ingest firehose (high level consumer) for realtime nodes(deprecated).|[link](../development/extensions-core/kafka-eight-firehose.html)| |druid-kafka-extraction-namespace|Kafka-based namespaced lookup. Requires namespace lookup extension.|[link](../development/extensions-core/kafka-extraction-namespace.html)| |druid-kafka-indexing-service|Supervised exactly-once Kafka ingestion for the indexing service.|[link](../development/extensions-core/kafka-ingestion.html)| |druid-kinesis-indexing-service|Supervised exactly-once Kinesis ingestion for the indexing service.|[link](../development/extensions-core/kinesis-ingestion.html)| @@ -81,7 +81,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c |druid-cassandra-storage|Apache Cassandra deep storage.|[link](../development/extensions-contrib/cassandra.html)| |druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)| |druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)| -|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)| +|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer)(deprecated).|[link](../development/extensions-contrib/kafka-simple.html)| |druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-contrib/orc.html)| |druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)| |druid-redis-cache|A cache implementation for Druid based on Redis.|[link](../development/extensions-contrib/redis-cache.html)| diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java index a03f3204959b..8fad815177d1 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java @@ -28,6 +28,7 @@ import java.util.List; +@Deprecated public class KafkaEightSimpleConsumerDruidModule implements DruidModule { @Override diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java index 65c501a8e542..ca34e55d6a7d 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +@Deprecated public class KafkaEightSimpleConsumerFirehoseFactory implements FirehoseFactoryV2 { diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java index 038fb2db90f4..25fc8de15daf 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/org/apache/druid/firehose/kafka/KafkaSimpleConsumer.java @@ -56,6 +56,7 @@ * This class is not thread safe, the caller must ensure all the methods be * called from single thread */ +@Deprecated public class KafkaSimpleConsumer { diff --git a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java index 4fe379a48830..f8a5ad7d3736 100644 --- a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java +++ b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightDruidModule.java @@ -30,6 +30,7 @@ /** */ +@Deprecated public class KafkaEightDruidModule implements DruidModule { @Override diff --git a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java index 0d0ed5863dd2..bb38c050a14a 100644 --- a/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java +++ b/extensions-core/kafka-eight/src/main/java/org/apache/druid/firehose/kafka/KafkaEightFirehoseFactory.java @@ -47,7 +47,9 @@ import java.util.Set; /** + * This class is deprecated and kafka-eight module should be removed completely */ +@Deprecated public class KafkaEightFirehoseFactory implements FirehoseFactory> { private static final Logger log = new Logger(KafkaEightFirehoseFactory.class); diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml index c48091f16697..b7c781693235 100644 --- a/extensions-core/kafka-indexing-service/pom.xml +++ b/extensions-core/kafka-indexing-service/pom.xml @@ -34,7 +34,7 @@ - 0.10.2.2 + 2.1.0 diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index 950441c3c591..4fcbbe10f0c0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -108,6 +108,7 @@ KafkaConsumer newConsumer() props.setProperty("auto.offset.reset", "none"); props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName()); props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName()); + props.setProperty("isolation.level", "read_committed"); return new KafkaConsumer<>(props); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java index af84bfcd4ab1..5f3681623fab 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java @@ -45,8 +45,7 @@ public KafkaIndexTaskIOConfig( @JsonProperty("pollTimeout") Long pollTimeout, @JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, - @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, - @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps + @JsonProperty("maximumMessageTime") DateTime maximumMessageTime ) { super( @@ -57,7 +56,6 @@ public KafkaIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - skipOffsetGaps, null ); @@ -100,7 +98,6 @@ public String toString() ", useTransaction=" + isUseTransaction() + ", minimumMessageTime=" + getMinimumMessageTime() + ", maximumMessageTime=" + getMaximumMessageTime() + - ", skipOffsetGaps=" + isSkipOffsetGaps() + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 935404cbc7c2..6c3d053a3bb5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import javax.annotation.Nonnull; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -110,7 +111,7 @@ public Set> getAssignment() public List> poll(long timeout) { List> polledRecords = new ArrayList<>(); - for (ConsumerRecord record : consumer.poll(timeout)) { + for (ConsumerRecord record : consumer.poll(Duration.ofMillis(timeout))) { polledRecords.add(new OrderedPartitionableRecord<>( record.topic(), record.partition(), diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index d081a0e0d796..c82700d96260 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -413,23 +413,6 @@ public void run() } if (record.offset() < endOffsets.get(record.partition())) { - if (record.offset() != nextOffsets.get(record.partition())) { - if (ioConfig.isSkipOffsetGaps()) { - log.warn( - "Skipped to offset[%,d] after offset[%,d] in partition[%d].", - record.offset(), - nextOffsets.get(record.partition()), - record.partition() - ); - } else { - throw new ISE( - "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", - record.offset(), - nextOffsets.get(record.partition()), - record.partition() - ); - } - } try { final byte[] valueBytes = record.value(); @@ -489,7 +472,7 @@ public void run() nextOffsets.put(record.partition(), record.offset() + 1); } - if (nextOffsets.get(record.partition()).equals(endOffsets.get(record.partition())) + if (nextOffsets.get(record.partition()) >= (endOffsets.get(record.partition())) && assignment.remove(record.partition())) { log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); KafkaIndexTask.assignPartitions(consumer, topic, assignment); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 0e9dcacc8153..bb388cd55e25 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -212,8 +212,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( kafkaIoConfig.getPollTimeout(), true, minimumMessageTime, - maximumMessageTime, - kafkaIoConfig.isSkipOffsetGaps() + maximumMessageTime ); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index ddd0f06d205f..629daa780ed9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -38,7 +38,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig private final Map consumerProperties; private final long pollTimeout; - private final boolean skipOffsetGaps; + @JsonCreator public KafkaSupervisorIOConfig( @@ -53,8 +53,7 @@ public KafkaSupervisorIOConfig( @JsonProperty("useEarliestOffset") Boolean useEarliestOffset, @JsonProperty("completionTimeout") Period completionTimeout, @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod, - @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, - @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps + @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod ) { super( @@ -76,7 +75,6 @@ public KafkaSupervisorIOConfig( StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY) ); this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS; - this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false; } @JsonProperty @@ -103,12 +101,6 @@ public boolean isUseEarliestOffset() return isUseEarliestSequenceNumber(); } - @JsonProperty - public boolean isSkipOffsetGaps() - { - return skipOffsetGaps; - } - @Override public String toString() { @@ -125,7 +117,6 @@ public String toString() ", completionTimeout=" + getCompletionTimeout() + ", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() + ", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() + - ", skipOffsetGaps=" + skipOffsetGaps + '}'; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java index 7ce8df024443..556cba1a39be 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIOConfigTest.java @@ -77,7 +77,6 @@ public void testSerdeWithDefaults() throws Exception Assert.assertTrue(config.isUseTransaction()); Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent()); Assert.assertFalse("maximumMessageTime", config.getMaximumMessageTime().isPresent()); - Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps()); Assert.assertEquals(Collections.EMPTY_SET, config.getExclusiveStartSequenceNumberPartitions()); } @@ -93,8 +92,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" - + " \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n" - + " \"skipOffsetGaps\": true\n" + + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; KafkaIndexTaskIOConfig config = (KafkaIndexTaskIOConfig) mapper.readValue( @@ -115,9 +113,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertFalse(config.isUseTransaction()); Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getMinimumMessageTime().get()); Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get()); - Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps()); Assert.assertEquals(Collections.EMPTY_SET, config.getExclusiveStartSequenceNumberPartitions()); - } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index b7b389668a46..3d6308cb0a96 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -99,14 +99,16 @@ import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.Druids; import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator; import org.apache.druid.query.Query; -import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.Result; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -114,6 +116,13 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanQueryConfig; +import org.apache.druid.query.scan.ScanQueryEngine; +import org.apache.druid.query.scan.ScanQueryQueryToolChest; +import org.apache.druid.query.scan.ScanQueryRunnerFactory; +import org.apache.druid.query.scan.ScanResultValue; +import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; @@ -172,11 +181,14 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.apache.druid.query.QueryPlus.wrap; + @RunWith(Parameterized.class) public class KafkaIndexTaskTest { @@ -372,11 +384,7 @@ public static void tearDownClass() throws Exception public void testRunAfterDataInserted() throws Exception { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -389,8 +397,7 @@ public void testRunAfterDataInserted() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -432,8 +439,7 @@ public void testRunBeforeDataInserted() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -445,11 +451,7 @@ public void testRunBeforeDataInserted() throws Exception } // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -484,11 +486,7 @@ public void testIncrementalHandOff() throws Exception maxRowsPerSegment = 2; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); Map consumerProps = kafkaServer.consumerProperties(); consumerProps.put("max.poll.records", "1"); @@ -541,8 +539,7 @@ public void testIncrementalHandOff() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); final ListenableFuture future = runTask(task); @@ -614,141 +611,148 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception int numToAdd = records.size() - 2; try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (int i = 0; i < numToAdd; i++) { kafkaProducer.send(records.get(i)).get(); } + kafkaProducer.commitTransaction(); + } - Map consumerProps = kafkaServer.consumerProperties(); - consumerProps.put("max.poll.records", "1"); + Map consumerProps = kafkaServer.consumerProperties(); + consumerProps.put("max.poll.records", "1"); - final SeekableStreamPartitions startPartitions = new SeekableStreamPartitions<>( - topic, - ImmutableMap.of( - 0, - 0L, - 1, - 0L - ) - ); - final SeekableStreamPartitions checkpoint1 = new SeekableStreamPartitions<>( - topic, - ImmutableMap.of( - 0, - 3L, - 1, - 0L - ) - ); - final SeekableStreamPartitions checkpoint2 = new SeekableStreamPartitions<>( - topic, - ImmutableMap.of( - 0, - 10L, - 1, - 0L - ) - ); + final SeekableStreamPartitions startPartitions = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of( + 0, + 0L, + 1, + 0L + ) + ); + final SeekableStreamPartitions checkpoint1 = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of( + 0, + 3L, + 1, + 0L + ) + ); + final SeekableStreamPartitions checkpoint2 = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of( + 0, + 10L, + 1, + 0L + ) + ); - final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( - topic, - ImmutableMap.of( - 0, - 10L, - 1, - 2L - ) - ); - final KafkaIndexTask task = createTask( - null, - new KafkaIndexTaskIOConfig( - 0, - baseSequenceName, - startPartitions, - endPartitions, - consumerProps, - KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, - true, - null, - null, - false - ) - ); - final ListenableFuture future = runTask(task); - while (task.getRunner().getStatus() != Status.PAUSED) { - Thread.sleep(10); - } - final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); + final SeekableStreamPartitions endPartitions = new SeekableStreamPartitions<>( + topic, + ImmutableMap.of( + 0, + 10L, + 1, + 2L + ) + ); + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + baseSequenceName, + startPartitions, + endPartitions, + consumerProps, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + final ListenableFuture future = runTask(task); + while (task.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } + final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); - Assert.assertTrue(checkpoint1.getPartitionSequenceNumberMap().equals(currentOffsets)); - task.getRunner().setEndOffsets(currentOffsets, false); + Assert.assertTrue(checkpoint1.getPartitionSequenceNumberMap().equals(currentOffsets)); + task.getRunner().setEndOffsets(currentOffsets, false); - while (task.getRunner().getStatus() != Status.PAUSED) { - Thread.sleep(10); - } + while (task.getRunner().getStatus() != Status.PAUSED) { + Thread.sleep(10); + } - // add remaining records + // add remaining records + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (int i = numToAdd; i < records.size(); i++) { kafkaProducer.send(records.get(i)).get(); } - final Map nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); - - Assert.assertTrue(checkpoint2.getPartitionSequenceNumberMap().equals(nextOffsets)); - task.getRunner().setEndOffsets(nextOffsets, false); - - Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); - - Assert.assertEquals(2, checkpointRequestsHash.size()); - Assert.assertTrue( - checkpointRequestsHash.contains( - Objects.hash( - DATA_SCHEMA.getDataSource(), - 0, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, currentOffsets)) - ) - ) - ); - Assert.assertTrue( - checkpointRequestsHash.contains( - Objects.hash( - DATA_SCHEMA.getDataSource(), - 0, - new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, currentOffsets)), - new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, nextOffsets)) - ) - ) - ); + kafkaProducer.commitTransaction(); + } + final Map nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); - // Check metrics - Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed()); - Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable()); - Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway()); - - // Check published metadata - SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); - SegmentDescriptor desc2 = SD(task, "2009/P1D", 0); - SegmentDescriptor desc3 = SD(task, "2010/P1D", 0); - SegmentDescriptor desc4 = SD(task, "2011/P1D", 0); - SegmentDescriptor desc5 = SD(task, "2011/P1D", 1); - SegmentDescriptor desc6 = SD(task, "2012/P1D", 0); - SegmentDescriptor desc7 = SD(task, "2013/P1D", 0); - Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); - Assert.assertEquals( - new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))), - metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) - ); + Assert.assertTrue(checkpoint2.getPartitionSequenceNumberMap().equals(nextOffsets)); + task.getRunner().setEndOffsets(nextOffsets, false); - // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1)); - Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2)); - Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc3)); - Assert.assertTrue((ImmutableList.of("d", "e").equals(readSegmentColumn("dim1", desc4)) - && ImmutableList.of("h").equals(readSegmentColumn("dim1", desc5))) || - (ImmutableList.of("d", "h").equals(readSegmentColumn("dim1", desc4)) - && ImmutableList.of("e").equals(readSegmentColumn("dim1", desc5)))); - Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc6)); - Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7)); - } + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + + Assert.assertEquals(2, checkpointRequestsHash.size()); + Assert.assertTrue( + checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + 0, + new KafkaDataSourceMetadata(startPartitions), + new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, currentOffsets)) + ) + ) + ); + Assert.assertTrue( + checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + 0, + new KafkaDataSourceMetadata(new SeekableStreamPartitions(topic, currentOffsets)), + new KafkaDataSourceMetadata(new SeekableStreamPartitions(topic, nextOffsets)) + ) + ) + ); + + // Check metrics + Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2009/P1D", 0); + SegmentDescriptor desc3 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc4 = SD(task, "2011/P1D", 0); + SegmentDescriptor desc5 = SD(task, "2011/P1D", 1); + SegmentDescriptor desc6 = SD(task, "2012/P1D", 0); + SegmentDescriptor desc7 = SD(task, "2013/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc3)); + Assert.assertTrue((ImmutableList.of("d", "e").equals(readSegmentColumn("dim1", desc4)) + && ImmutableList.of("h").equals(readSegmentColumn("dim1", desc5))) || + (ImmutableList.of("d", "h").equals(readSegmentColumn("dim1", desc4)) + && ImmutableList.of("e").equals(readSegmentColumn("dim1", desc5)))); + Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc6)); + Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7)); } @Test(timeout = 60_000L) @@ -763,11 +767,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception intermediateHandoffPeriod = new Period().withSeconds(0); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records.subList(0, 2)) { - kafkaProducer.send(record).get(); - } - } + insertData(); Map consumerProps = kafkaServer.consumerProperties(); consumerProps.put("max.poll.records", "1"); @@ -810,8 +810,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); final ListenableFuture future = runTask(task); @@ -882,9 +881,12 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception // Insert data try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (ProducerRecord record : records) { kafkaProducer.send(record).get(); } + kafkaProducer.commitTransaction(); } Map consumerProps = kafkaServer.consumerProperties(); consumerProps.put("max.poll.records", "1"); @@ -913,8 +915,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); final ListenableFuture future = runTask(task); @@ -950,8 +951,7 @@ public void testRunWithMinimumMessageTime() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, DateTimes.of("2010"), - null, - false + null ) ); @@ -963,11 +963,7 @@ public void testRunWithMinimumMessageTime() throws Exception } // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -1005,8 +1001,7 @@ public void testRunWithMaximumMessageTime() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - DateTimes.of("2010"), - false + DateTimes.of("2010") ) ); @@ -1018,11 +1013,7 @@ public void testRunWithMaximumMessageTime() throws Exception } // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -1070,8 +1061,7 @@ public void testRunWithTransformSpec() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1083,11 +1073,7 @@ public void testRunWithTransformSpec() throws Exception } // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for task to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -1114,11 +1100,7 @@ public void testRunWithTransformSpec() throws Exception public void testRunOnNothing() throws Exception { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -1131,8 +1113,7 @@ public void testRunOnNothing() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1156,11 +1137,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception handoffConditionTimeout = 5_000; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -1173,8 +1150,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1209,11 +1185,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio handoffConditionTimeout = 100; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -1226,8 +1198,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1265,11 +1236,7 @@ public void testReportParseExceptions() throws Exception maxSavedParseExceptions = 2; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -1282,8 +1249,7 @@ public void testReportParseExceptions() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1310,11 +1276,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception maxSavedParseExceptions = 6; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -1327,8 +1289,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1393,11 +1354,7 @@ public void testMultipleParseExceptionsFailure() throws Exception maxSavedParseExceptions = 2; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -1410,8 +1367,7 @@ public void testMultipleParseExceptionsFailure() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1471,8 +1427,7 @@ public void testRunReplicas() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); final KafkaIndexTask task2 = createTask( @@ -1486,8 +1441,7 @@ public void testRunReplicas() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1495,11 +1449,7 @@ public void testRunReplicas() throws Exception final ListenableFuture future2 = runTask(task2); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for tasks to exit Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode()); @@ -1541,8 +1491,7 @@ public void testRunConflicting() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); final KafkaIndexTask task2 = createTask( @@ -1556,17 +1505,12 @@ public void testRunConflicting() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Run first task final ListenableFuture future1 = runTask(task1); @@ -1612,8 +1556,7 @@ public void testRunConflictingWithoutTransactions() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, false, null, - null, - false + null ) ); final KafkaIndexTask task2 = createTask( @@ -1627,17 +1570,12 @@ public void testRunConflictingWithoutTransactions() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, false, null, - null, - false + null ) ); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Run first task final ListenableFuture future1 = runTask(task1); @@ -1688,20 +1626,14 @@ public void testRunOneTaskTwoPartitions() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); final ListenableFuture future = runTask(task); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - kafkaProducer.flush(); - } + insertData(); // Wait for tasks to exit Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); @@ -1754,8 +1686,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); final KafkaIndexTask task2 = createTask( @@ -1769,8 +1700,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1778,11 +1708,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception final ListenableFuture future2 = runTask(task2); // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); // Wait for tasks to exit Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode()); @@ -1821,13 +1747,12 @@ public void testRestore() throws Exception 0, "sequence0", new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)), - new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)), kafkaServer.consumerProperties(), KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1835,9 +1760,12 @@ public void testRestore() throws Exception // Insert some data, but not enough for the task to finish try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (ProducerRecord record : Iterables.limit(records, 4)) { kafkaProducer.send(record).get(); } + kafkaProducer.commitTransaction(); } while (countEvents(task1) != 2) { @@ -1859,13 +1787,12 @@ public void testRestore() throws Exception 0, "sequence0", new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)), - new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)), kafkaServer.consumerProperties(), KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1873,12 +1800,16 @@ public void testRestore() throws Exception // Insert remaining data try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (ProducerRecord record : Iterables.skip(records, 4)) { kafkaProducer.send(record).get(); } + kafkaProducer.commitTransaction(); } // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode()); // Check metrics @@ -1894,7 +1825,7 @@ public void testRestore() throws Exception SegmentDescriptor desc2 = SD(task1, "2011/P1D", 0); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals( - new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))), + new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) ); @@ -1912,13 +1843,12 @@ public void testRunWithPauseAndResume() throws Exception 0, "sequence0", new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 2L)), - new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L)), kafkaServer.consumerProperties(), KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -1926,10 +1856,13 @@ public void testRunWithPauseAndResume() throws Exception // Insert some data, but not enough for the task to finish try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (ProducerRecord record : Iterables.limit(records, 4)) { kafkaProducer.send(record).get(); } kafkaProducer.flush(); + kafkaProducer.commitTransaction(); } while (countEvents(task) != 2) { @@ -1946,12 +1879,14 @@ public void testRunWithPauseAndResume() throws Exception } ); Assert.assertEquals(Status.PAUSED, task.getRunner().getStatus()); - // Insert remaining data try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (ProducerRecord record : Iterables.skip(records, 4)) { kafkaProducer.send(record).get(); } + kafkaProducer.commitTransaction(); } try { @@ -1979,7 +1914,7 @@ public void testRunWithPauseAndResume() throws Exception SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); Assert.assertEquals( - new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L))), + new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 6L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) ); @@ -2002,8 +1937,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -2025,11 +1959,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva { resetOffsetAutomatically = true; // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -2042,8 +1972,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -2070,11 +1999,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception return; } // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final TreeMap> sequences = new TreeMap<>(); // Here the sequence number is 1 meaning that one incremental handoff was done by the failed task @@ -2097,8 +2022,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ), context ); @@ -2131,11 +2055,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception public void testRunWithDuplicateRequest() throws Exception { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); final KafkaIndexTask task = createTask( null, @@ -2148,8 +2068,7 @@ public void testRunWithDuplicateRequest() throws Exception KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, null, - null, - false + null ) ); @@ -2170,6 +2089,134 @@ public void testRunWithDuplicateRequest() throws Exception Assert.assertEquals(Status.READING, task.getRunner().getStatus()); } + @Test(timeout = 60_000L) + public void testRunTransactionModeRollback() throws Exception + { + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L)), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null + ) + ); + + final ListenableFuture future = runTask(task); + + // Insert 2 records initially + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.limit(records, 2)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + + while (countEvents(task) != 2) { + Thread.sleep(25); + } + + Assert.assertEquals(2, countEvents(task)); + Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + + //verify the 2 indexed records + final QuerySegmentSpec firstInterval = objectMapper.readValue( + "\"2008/2010\"", QuerySegmentSpec.class + ); + Iterable scanResultValues = scanData(task, firstInterval); + Assert.assertEquals(2, Iterables.size(scanResultValues)); + + // Insert 3 more records and rollback + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.limit(Iterables.skip(records, 2), 3)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.flush(); + kafkaProducer.abortTransaction(); + } + + Assert.assertEquals(2, countEvents(task)); + Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + + final QuerySegmentSpec rollbackedInterval = objectMapper.readValue( + "\"2010/2012\"", QuerySegmentSpec.class + ); + scanResultValues = scanData(task, rollbackedInterval); + //verify that there are no records indexed in the rollbacked time period + Assert.assertEquals(0, Iterables.size(scanResultValues)); + + // Insert remaining data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : Iterables.skip(records, 5)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + + final QuerySegmentSpec endInterval = objectMapper.readValue( + "\"2008/2049\"", QuerySegmentSpec.class + ); + Iterable scanResultValues1 = scanData(task, endInterval); + Assert.assertEquals(2, Iterables.size(scanResultValues1)); + + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets()); + + // Check metrics + Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2009/P1D", 0); + SegmentDescriptor desc3 = SD(task, "2013/P1D", 0); + SegmentDescriptor desc4 = SD(task, "2049/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 13L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("a"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc2)); + Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc3)); + Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc4)); + } + + private List scanData(final Task task, QuerySegmentSpec spec) + { + ScanQuery query = new Druids.ScanQueryBuilder().dataSource( + DATA_SCHEMA.getDataSource()).intervals(spec).build(); + List results = + task.getQueryRunner(query).run(wrap(query), new HashMap<>()).toList(); + return results; + } + + private void insertData() throws ExecutionException, InterruptedException + { + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + } + private ListenableFuture runTask(final Task task) { try { @@ -2338,7 +2385,7 @@ private static DataSchema cloneDataSchema(final DataSchema dataSchema) ); } - private QueryRunnerFactoryConglomerate makeTimeseriesOnlyConglomerate() + private QueryRunnerFactoryConglomerate makeTimeseriesAndScanConglomerate() { IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator( null, @@ -2353,16 +2400,33 @@ public QueryRunner decorate(QueryRunner delegate, QueryToolChest { - // do nothing - } + ImmutableMap., QueryRunnerFactory>builder() + .put( + TimeseriesQuery.class, + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(queryRunnerDecorator), + new TimeseriesQueryEngine(), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + // do nothing + } + } + ) ) - ) + .put( + ScanQuery.class, + new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper()) + ), + new ScanQueryEngine() + ) + ) + .build() ); } @@ -2492,7 +2556,7 @@ public List getLocations() new TestDataSegmentAnnouncer(), EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, - this::makeTimeseriesOnlyConglomerate, + this::makeTimeseriesAndScanConglomerate, Execs.directExecutor(), // queryExecutorService EasyMock.createMock(MonitorScheduler.class), new SegmentLoaderFactory( @@ -2603,7 +2667,7 @@ public long countEvents(final Task task) .build(); List> results = - task.getQueryRunner(query).run(QueryPlus.wrap(query), ImmutableMap.of()).toList(); + task.getQueryRunner(query).run(wrap(query), ImmutableMap.of()).toList(); return results.isEmpty() ? 0L : DimensionHandlerUtils.nullToZero(results.get(0).getValue().getLongMetric("rows")); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index 2f445aa99431..f944bf04610b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -166,11 +166,7 @@ public void testSupplierSetup() throws ExecutionException, InterruptedException { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); Set> partitions = ImmutableSet.of( StreamPartition.of(topic, 0), @@ -195,11 +191,7 @@ public void testPoll() throws InterruptedException, ExecutionException { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); Set> partitions = ImmutableSet.of( StreamPartition.of(topic, 0), @@ -232,10 +224,13 @@ public void testPoll() throws InterruptedException, ExecutionException public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionException { // Insert data - - KafkaProducer producer = kafkaServer.newProducer(); - for (ProducerRecord record : records.subList(0, 13)) { - producer.send(record).get(); + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : records.subList(0, 13)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); } Set> partitions = ImmutableSet.of( @@ -257,8 +252,13 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE } // Insert data - for (ProducerRecord rec : records.subList(13, 15)) { - producer.send(rec).get(); + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : records.subList(13, 15)) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); } @@ -270,8 +270,28 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE List> initialRecords = createOrderedPartitionableRecords(); Assert.assertEquals(records.size(), polledRecords.size()); - Assert.assertTrue(initialRecords.containsAll(polledRecords)); + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + final int initialRecordsPartition0Size = initialRecords.stream() + .filter(r -> r.getPartitionId().equals(0)) + .collect(Collectors.toSet()) + .size(); + final int initialRecordsPartition1Size = initialRecords.stream() + .filter(r -> r.getPartitionId().equals(1)) + .collect(Collectors.toSet()) + .size(); + + final int polledRecordsPartition0Size = polledRecords.stream() + .filter(r -> r.getPartitionId().equals(0)) + .collect(Collectors.toSet()) + .size(); + final int polledRecordsPartition1Size = polledRecords.stream() + .filter(r -> r.getPartitionId().equals(1)) + .collect(Collectors.toSet()) + .size(); + + Assert.assertEquals(initialRecordsPartition0Size, polledRecordsPartition0Size); + Assert.assertEquals(initialRecordsPartition1Size, polledRecordsPartition1Size); recordSupplier.close(); } @@ -280,11 +300,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE public void testSeek() throws InterruptedException, ExecutionException { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); StreamPartition partition0 = StreamPartition.of(topic, 0); StreamPartition partition1 = StreamPartition.of(topic, 1); @@ -326,11 +342,7 @@ public void testSeek() throws InterruptedException, ExecutionException public void testSeekToLatest() throws InterruptedException, ExecutionException { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); StreamPartition partition0 = StreamPartition.of(topic, 0); StreamPartition partition1 = StreamPartition.of(topic, 1); @@ -388,11 +400,7 @@ public void testSeekUnassigned() throws InterruptedException, ExecutionException public void testPosition() throws ExecutionException, InterruptedException { // Insert data - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } + insertData(); StreamPartition partition0 = StreamPartition.of(topic, 0); StreamPartition partition1 = StreamPartition.of(topic, 1); @@ -420,7 +428,7 @@ public void testPosition() throws ExecutionException, InterruptedException Assert.assertEquals(0L, (long) recordSupplier.getPosition(partition0)); recordSupplier.seekToLatest(Collections.singleton(partition0)); - Assert.assertEquals(11L, (long) recordSupplier.getPosition(partition0)); + Assert.assertEquals(12L, (long) recordSupplier.getPosition(partition0)); long prevPos = recordSupplier.getPosition(partition0); recordSupplier.getEarliestSequenceNumber(partition0); @@ -433,5 +441,16 @@ public void testPosition() throws ExecutionException, InterruptedException recordSupplier.close(); } + private void insertData() throws ExecutionException, InterruptedException + { + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); + } + kafkaProducer.commitTransaction(); + } + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 3337faa080ad..ee3dfe8e01c5 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -78,7 +78,6 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(Duration.standardMinutes(30), config.getCompletionTimeout()); Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent()); Assert.assertFalse("earlyMessageRejectionPeriod", config.getEarlyMessageRejectionPeriod().isPresent()); - Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps()); } @Test @@ -97,8 +96,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"useEarliestOffset\": true,\n" + " \"completionTimeout\": \"PT45M\",\n" + " \"lateMessageRejectionPeriod\": \"PT1H\",\n" - + " \"earlyMessageRejectionPeriod\": \"PT1H\",\n" - + " \"skipOffsetGaps\": true\n" + + " \"earlyMessageRejectionPeriod\": \"PT1H\"\n" + "}"; KafkaSupervisorIOConfig config = mapper.readValue( @@ -122,7 +120,6 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(Duration.standardMinutes(45), config.getCompletionTimeout()); Assert.assertEquals(Duration.standardHours(1), config.getLateMessageRejectionPeriod().get()); Assert.assertEquals(Duration.standardHours(1), config.getEarlyMessageRejectionPeriod().get()); - Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index a08806acc173..f2db280c51eb 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -258,7 +258,7 @@ public static void tearDownClass() throws IOException @Test public void testNoInitialState() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(); @@ -289,7 +289,6 @@ public void testNoInitialState() throws Exception Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent()); - Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); Assert.assertEquals(topic, taskConfig.getStartPartitions().getStream()); Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionSequenceNumberMap().get(0)); @@ -305,7 +304,7 @@ public void testNoInitialState() throws Exception @Test public void testSkipOffsetGaps() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(); @@ -328,13 +327,12 @@ public void testSkipOffsetGaps() throws Exception KafkaIndexTask task = captured.getValue(); KafkaIndexTaskIOConfig taskConfig = task.getIOConfig(); - Assert.assertTrue("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); } @Test public void testMultiTask() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -380,7 +378,7 @@ public void testMultiTask() throws Exception @Test public void testReplicas() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -417,7 +415,7 @@ public void testReplicas() throws Exception @Test public void testLateMessageRejectionPeriod() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -456,7 +454,7 @@ public void testLateMessageRejectionPeriod() throws Exception @Test public void testEarlyMessageRejectionPeriod() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"), false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H")); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -498,7 +496,7 @@ public void testEarlyMessageRejectionPeriod() throws Exception */ public void testLatestOffset() throws Exception { - supervisor = getSupervisor(1, 1, false, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, false, "PT1H", null, null); addSomeEvents(1100); Capture captured = Capture.newInstance(); @@ -518,9 +516,9 @@ public void testLatestOffset() throws Exception verifyAll(); KafkaIndexTask task = captured.getValue(); - Assert.assertEquals(1100L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(0)); - Assert.assertEquals(1100L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(1)); - Assert.assertEquals(1100L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(2)); + Assert.assertEquals(1101L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(0)); + Assert.assertEquals(1101L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(1)); + Assert.assertEquals(1101L, (long) task.getIOConfig().getStartPartitions().getPartitionSequenceNumberMap().get(2)); } @Test @@ -530,7 +528,7 @@ public void testLatestOffset() throws Exception */ public void testDatasourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(); @@ -560,7 +558,7 @@ public void testDatasourceMetadata() throws Exception @Test(expected = ISE.class) public void testBadMetadataOffsets() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); @@ -579,7 +577,7 @@ public void testBadMetadataOffsets() throws Exception @Test public void testKillIncompatibleTasks() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); // unexpected # of partitions (kill) @@ -684,7 +682,7 @@ public void testKillIncompatibleTasks() throws Exception @Test public void testKillBadPartitionAssignment() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 2, true, "PT1H", null, null); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -791,7 +789,7 @@ public void testKillBadPartitionAssignment() throws Exception @Test public void testRequeueTaskWhenFailed() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -880,7 +878,7 @@ public void testRequeueTaskWhenFailed() throws Exception @Test public void testRequeueAdoptedTaskWhenFailed() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); DateTime now = DateTimes.nowUtc(); @@ -981,7 +979,7 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception @Test public void testQueueNextTasksOnSuccess() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1083,7 +1081,7 @@ public void testBeginPublishAndQueueNextTasks() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1M", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1178,7 +1176,7 @@ public void testDiscoverExistingPublishingTask() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Task task = createKafkaIndexTask( @@ -1284,7 +1282,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(1); Task task = createKafkaIndexTask( @@ -1383,7 +1381,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); addSomeEvents(6); Task id1 = createKafkaIndexTask( @@ -1474,23 +1472,23 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception Assert.assertEquals(startTime, activeReport.getStartTime()); Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L), activeReport.getStartingOffsets()); Assert.assertEquals(ImmutableMap.of(0, 4L, 1, 5L, 2, 6L), activeReport.getCurrentOffsets()); - Assert.assertEquals(ImmutableMap.of(0, 2L, 1, 1L, 2, 0L), activeReport.getLag()); + Assert.assertEquals(ImmutableMap.of(0, 3L, 1, 2L, 2, 1L), activeReport.getLag()); Assert.assertEquals("id1", publishingReport.getId()); Assert.assertEquals(ImmutableMap.of(0, 0L, 1, 0L, 2, 0L), publishingReport.getStartingOffsets()); Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 2L, 2, 3L), publishingReport.getCurrentOffsets()); Assert.assertEquals(null, publishingReport.getLag()); - Assert.assertEquals(ImmutableMap.of(0, 6L, 1, 6L, 2, 6L), payload.getLatestOffsets()); - Assert.assertEquals(ImmutableMap.of(0, 2L, 1, 1L, 2, 0L), payload.getMinimumLag()); - Assert.assertEquals(3L, (long) payload.getAggregateLag()); + Assert.assertEquals(ImmutableMap.of(0, 7L, 1, 7L, 2, 7L), payload.getLatestOffsets()); + Assert.assertEquals(ImmutableMap.of(0, 3L, 1, 2L, 2, 1L), payload.getMinimumLag()); + Assert.assertEquals(6L, (long) payload.getAggregateLag()); Assert.assertTrue(payload.getOffsetsLastUpdated().plusMinutes(1).isAfterNow()); } @Test public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1H", null, null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1549,7 +1547,7 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1M", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1635,7 +1633,7 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234, -1); - supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false); + supervisor = getSupervisor(2, 2, true, "PT1M", null, null); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -1726,7 +1724,7 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception @Test(expected = IllegalStateException.class) public void testStopNotStarted() { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); supervisor.stop(false); } @@ -1738,7 +1736,7 @@ public void testStop() taskRunner.unregisterListener(StringUtils.format("KafkaSupervisor-%s", DATASOURCE)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); supervisor.start(); supervisor.stop(false); @@ -1752,7 +1750,7 @@ public void testStopGracefully() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -1864,7 +1862,7 @@ public void testResetNoTasks() throws Exception taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); supervisor.start(); supervisor.runInternal(); verifyAll(); @@ -1881,7 +1879,7 @@ public void testResetNoTasks() throws Exception @Test public void testResetDataSourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); @@ -1937,7 +1935,7 @@ public void testResetDataSourceMetadata() throws Exception @Test public void testResetNoDataSourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); @@ -1970,7 +1968,7 @@ public void testResetRunningTasks() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -2068,7 +2066,7 @@ public void testResetRunningTasks() throws Exception public void testNoDataIngestionTasks() throws Exception { final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1S", null, null); //not adding any events Task id1 = createKafkaIndexTask( "id1", @@ -2164,7 +2162,7 @@ public void testNoDataIngestionTasks() throws Exception public void testCheckpointForInactiveTaskGroup() throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException { - supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1S", null, null); //not adding any events final Task id1 = createKafkaIndexTask( "id1", @@ -2267,7 +2265,7 @@ public void testCheckpointForInactiveTaskGroup() public void testCheckpointForUnknownTaskGroup() throws InterruptedException { - supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + supervisor = getSupervisor(2, 1, true, "PT1S", null, null); //not adding any events final Task id1 = createKafkaIndexTask( "id1", @@ -2346,7 +2344,7 @@ public void testCheckpointForUnknownTaskGroup() public void testCheckpointWithNullTaskGroupId() throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException { - supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false); + supervisor = getSupervisor(1, 3, true, "PT1S", null, null); //not adding any events final Task id1 = createKafkaIndexTask( "id1", @@ -2438,7 +2436,7 @@ public void testCheckpointWithNullTaskGroupId() @Test public void testSuspendedNoRunningTasks() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true, kafkaHost); addSomeEvents(1); expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -2471,7 +2469,7 @@ public void testSuspendedRunningTasks() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145, -1); final DateTime startTime = DateTimes.nowUtc(); - supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false, true, kafkaHost); + supervisor = getSupervisor(2, 1, true, "PT1H", null, null, true, kafkaHost); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -2579,7 +2577,7 @@ public void testResetSuspended() throws Exception taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost); + supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true, kafkaHost); supervisor.start(); supervisor.runInternal(); verifyAll(); @@ -2604,7 +2602,6 @@ public void testFailedInitializationAndRecovery() throws Exception null, null, false, - false, StringUtils.format("badhostname:%d", kafkaServer.getPort()) ); addSomeEvents(1); @@ -2670,7 +2667,6 @@ public void testFailedInitializationAndRecovery() throws Exception Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent()); - Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); Assert.assertEquals(topic, taskConfig.getStartPartitions().getStream()); Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionSequenceNumberMap().get(0)); @@ -2686,7 +2682,7 @@ public void testFailedInitializationAndRecovery() throws Exception @Test public void testGetCurrentTotalStats() { - supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false); + supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false, kafkaHost); supervisor.addTaskGroupToActivelyReadingTaskGroup( supervisor.getTaskGroupIdForPartition(0), ImmutableMap.of(0, 0L), @@ -2734,6 +2730,8 @@ private void addSomeEvents(int numEventsPerPartition) throws Exception AdminUtils.createTopic(zkUtils, topic, NUM_PARTITIONS, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); for (int i = 0; i < NUM_PARTITIONS; i++) { for (int j = 0; j < numEventsPerPartition; j++) { kafkaProducer.send( @@ -2746,6 +2744,7 @@ private void addSomeEvents(int numEventsPerPartition) throws Exception ).get(); } } + kafkaProducer.commitTransaction(); } } @@ -2755,8 +2754,7 @@ private KafkaSupervisor getSupervisor( boolean useEarliestOffset, String duration, Period lateMessageRejectionPeriod, - Period earlyMessageRejectionPeriod, - boolean skipOffsetGaps + Period earlyMessageRejectionPeriod ) { return getSupervisor( @@ -2766,7 +2764,6 @@ private KafkaSupervisor getSupervisor( duration, lateMessageRejectionPeriod, earlyMessageRejectionPeriod, - skipOffsetGaps, false, kafkaHost ); @@ -2779,7 +2776,6 @@ private KafkaSupervisor getSupervisor( String duration, Period lateMessageRejectionPeriod, Period earlyMessageRejectionPeriod, - boolean skipOffsetGaps, boolean suspended, String kafkaHost ) @@ -2787,6 +2783,7 @@ private KafkaSupervisor getSupervisor( Map consumerProperties = new HashMap<>(); consumerProperties.put("myCustomKey", "myCustomValue"); consumerProperties.put("bootstrap.servers", kafkaHost); + consumerProperties.put("isolation.level", "read_committed"); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, replicas, @@ -2799,8 +2796,7 @@ private KafkaSupervisor getSupervisor( useEarliestOffset, new Period("PT30M"), lateMessageRejectionPeriod, - earlyMessageRejectionPeriod, - skipOffsetGaps + earlyMessageRejectionPeriod ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( @@ -2908,8 +2904,7 @@ private KafkaIndexTask createKafkaIndexTask( KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, true, minimumMessageTime, - maximumMessageTime, - false + maximumMessageTime ), Collections.emptyMap(), null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java index 6659f92ebbf0..10c9b2ef4096 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestBroker.java @@ -40,11 +40,12 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.ThreadLocalRandom; public class TestBroker implements Closeable { - + private static final Random RANDOM = ThreadLocalRandom.current(); private final String zookeeperConnect; private final File directory; private final boolean directoryCleanup; @@ -77,6 +78,9 @@ public void start() props.setProperty("broker.id", String.valueOf(id)); props.setProperty("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000)); props.setProperty("advertised.host.name", "localhost"); + props.setProperty("transaction.state.log.replication.factor", "1"); + props.setProperty("offsets.topic.replication.factor", "1"); + props.setProperty("transaction.state.log.min.isr", "1"); props.putAll(brokerProps); final KafkaConfig config = new KafkaConfig(props); @@ -112,6 +116,8 @@ public Map producerProperties() props.put("key.serializer", ByteArraySerializer.class.getName()); props.put("value.serializer", ByteArraySerializer.class.getName()); props.put("acks", "all"); + props.put("enable.idempotence", "true"); + props.put("transactional.id", String.valueOf(RANDOM.nextInt())); return props; } @@ -121,8 +127,9 @@ public Map consumerProperties() props.put("bootstrap.servers", StringUtils.format("localhost:%d", getPort())); props.put("key.deserializer", ByteArrayDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); - props.put("group.id", String.valueOf(ThreadLocalRandom.current().nextInt())); + props.put("group.id", String.valueOf(RANDOM.nextInt())); props.put("auto.offset.reset", "earliest"); + props.put("isolation.level", "read_committed"); return props; } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index 307e971bcfd3..8dd32f246342 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -68,7 +68,6 @@ public KinesisIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - true, exclusiveStartSequenceNumberPartitions ); Preconditions.checkArgument(endPartitions.getPartitionSequenceNumberMap() diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java index fff34f946b26..ab782299ee79 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java @@ -89,7 +89,6 @@ public void testSerdeWithDefaults() throws Exception Assert.assertNull(config.getAwsAssumedRoleArn()); Assert.assertNull(config.getAwsExternalId()); Assert.assertFalse(config.isDeaggregate()); - Assert.assertTrue(config.isSkipOffsetGaps()); } @Test @@ -146,7 +145,6 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals("role", config.getAwsAssumedRoleArn()); Assert.assertEquals("awsexternalid", config.getAwsExternalId()); Assert.assertTrue(config.isDeaggregate()); - Assert.assertTrue(config.isSkipOffsetGaps()); } @Test diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 29186961ce66..bfabab15911d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -64,7 +64,7 @@ import java.util.concurrent.ThreadLocalRandom; -public abstract class SeekableStreamIndexTask extends AbstractTask +public abstract class SeekableStreamIndexTask extends AbstractTask implements ChatHandler { public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index 6c469c7d0123..dde9702b22fa 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -43,7 +43,6 @@ public abstract class SeekableStreamIndexTaskIOConfig minimumMessageTime; private final Optional maximumMessageTime; - private final boolean skipOffsetGaps; private final Set exclusiveStartSequenceNumberPartitions; @JsonCreator @@ -55,7 +54,6 @@ public SeekableStreamIndexTaskIOConfig( @JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, - @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps, @JsonProperty("exclusiveStartSequenceNumberPartitions") Set exclusiveStartSequenceNumberPartitions ) @@ -67,7 +65,6 @@ public SeekableStreamIndexTaskIOConfig( this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); - this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS; this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions == null ? Collections.emptySet() : exclusiveStartSequenceNumberPartitions; @@ -137,9 +134,4 @@ public Optional getMinimumMessageTime() return minimumMessageTime; } - @JsonProperty - public boolean isSkipOffsetGaps() - { - return skipOffsetGaps; - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index c86a2b509c0c..b2b906ee57ce 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -130,7 +130,7 @@ * @param Partition Number Type * @param Sequence Number Type */ -public abstract class SeekableStreamIndexTaskRunner implements ChatHandler +public abstract class SeekableStreamIndexTaskRunner implements ChatHandler { public enum Status { @@ -474,7 +474,7 @@ public void run() } // if stop is requested or task's end sequence is set by call to setEndOffsets method with finish set to true - if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) { + if (stopRequested.get() || sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) { status = Status.PUBLISHING; } @@ -504,10 +504,11 @@ public void run() SequenceMetadata sequenceToCheckpoint = null; for (OrderedPartitionableRecord record : records) { + // for Kafka, the end offsets are exclusive, so skip it if (isEndSequenceOffsetsExclusive() && createSequenceNumber(record.getSequenceNumber()).compareTo( - createSequenceNumber(endOffsets.get(record.getPartitionId()))) == 0) { + createSequenceNumber(endOffsets.get(record.getPartitionId()))) >= 0) { continue; } @@ -530,17 +531,6 @@ public void run() } else if (createSequenceNumber(record.getSequenceNumber()).compareTo( createSequenceNumber(endOffsets.get(record.getPartitionId()))) <= 0) { - - if (!record.getSequenceNumber().equals(currOffsets.get(record.getPartitionId())) - && !ioConfig.isSkipOffsetGaps()) { - throw new ISE( - "WTF?! Got sequence[%s] after sequence[%s] in partition[%s].", - record.getSequenceNumber(), - currOffsets.get(record.getPartitionId()), - record.getPartitionId() - ); - } - try { final List valueBytess = record.getData(); final List rows; @@ -1897,7 +1887,7 @@ private boolean verifyInitialRecordAndSkipExclusivePartition( ) { if (intialSequenceSnapshot.containsKey(record.getPartitionId())) { - if (!intialSequenceSnapshot.get(record.getPartitionId()).equals(record.getSequenceNumber())) { + if (record.getSequenceNumber().compareTo(intialSequenceSnapshot.get(record.getPartitionId())) < 0) { throw new ISE( "Starting sequenceNumber [%s] does not match expected [%s] for partition [%s]", record.getSequenceNumber(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java index 4dd653e760ff..a122d9e15213 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java @@ -35,7 +35,7 @@ * @param partition id * @param sequence number */ -public class OrderedPartitionableRecord +public class OrderedPartitionableRecord { private final String stream; private final PartitionIdType partitionId; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java index d9e599da0c80..3a6e87ed3b24 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java @@ -36,7 +36,7 @@ * @param Sequence Number Type */ @Beta -public interface RecordSupplier extends Closeable +public interface RecordSupplier extends Closeable { /** * assigns the given partitions to this RecordSupplier diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 25250ac0487c..4c6509d31f82 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -119,7 +119,7 @@ * @param the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type * @param the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers */ -public abstract class SeekableStreamSupervisor +public abstract class SeekableStreamSupervisor implements Supervisor { public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED"; diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 24c910c18807..6159021d9e6b 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -32,7 +32,7 @@ - 0.10.2.2 + 2.1.0 @@ -51,17 +51,6 @@ druid-datasketches ${project.parent.version} - - org.apache.druid.extensions - druid-kafka-eight - ${project.parent.version} - - - kafka_2.10 - org.apache.kafka - - - org.apache.druid.extensions druid-histogram diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java deleted file mode 100644 index 10f9aab3bb19..000000000000 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.tests.indexer; - -import com.google.common.base.Throwables; -import com.google.inject.Inject; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; -import org.apache.commons.io.IOUtils; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.testing.IntegrationTestingConfig; -import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.RetryUtil; -import org.apache.druid.testing.utils.TestQueryHelper; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeSuite; -import org.testng.annotations.Guice; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Callable; - -/* - * This is a test for the kafka firehose. - */ -@Guice(moduleFactory = DruidTestModuleFactory.class) -public class ITKafkaTest extends AbstractIndexerTest -{ - private static final Logger LOG = new Logger(ITKafkaTest.class); - private static final int DELAY_BETWEEN_EVENTS_SECS = 5; - private static final String INDEXER_FILE = "/indexer/kafka_index_task.json"; - private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json"; - private static final String DATASOURCE = "kafka_test"; - private static final String TOPIC_NAME = "kafkaTopic"; - private static final int MINUTES_TO_SEND = 2; - public static final String testPropertyPrefix = "kafka.test.property."; - - - // We'll fill in the current time and numbers for added, deleted and changed - // before sending the event. - final String event_template = - "{\"timestamp\": \"%s\"," + - "\"page\": \"Gypsy Danger\"," + - "\"language\" : \"en\"," + - "\"user\" : \"nuclear\"," + - "\"unpatrolled\" : \"true\"," + - "\"newPage\" : \"true\"," + - "\"robot\": \"false\"," + - "\"anonymous\": \"false\"," + - "\"namespace\":\"article\"," + - "\"continent\":\"North America\"," + - "\"country\":\"United States\"," + - "\"region\":\"Bay Area\"," + - "\"city\":\"San Francisco\"," + - "\"added\":%d," + - "\"deleted\":%d," + - "\"delta\":%d}"; - - private String taskID; - private ZkClient zkClient; - private ZkUtils zkUtils; - private boolean segmentsExist; // to tell if we should remove segments during teardown - - // format for the querying interval - private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'"); - // format for the expected timestamp in a query response - private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'"); - private DateTime dtFirst; // timestamp of 1st event - private DateTime dtLast; // timestamp of last event - - @Inject - private TestQueryHelper queryHelper; - @Inject - private IntegrationTestingConfig config; - - private String fullDatasourceName; - - @BeforeSuite - public void setFullDatasourceName() - { - fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix(); - } - @Test - public void testKafka() - { - LOG.info("Starting test: ITKafkaTest"); - - // create topic - try { - int sessionTimeoutMs = 10000; - int connectionTimeoutMs = 10000; - String zkHosts = config.getZookeeperHosts(); - zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); - zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false); - if (config.manageKafkaTopic()) { - int numPartitions = 1; - int replicationFactor = 1; - Properties topicConfig = new Properties(); - // addFilteredProperties(topicConfig); - AdminUtils.createTopic( - zkUtils, - TOPIC_NAME, - numPartitions, - replicationFactor, - topicConfig, - RackAwareMode.Disabled$.MODULE$ - ); - } - } - catch (Exception e) { - throw new ISE(e, "could not create kafka topic"); - } - - // set up kafka producer - Properties properties = new Properties(); - addFilteredProperties(properties); - properties.put("bootstrap.servers", config.getKafkaHost()); - LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost()); - properties.put("acks", "all"); - properties.put("retries", "3"); - - KafkaProducer producer = new KafkaProducer<>( - properties, - new StringSerializer(), - new StringSerializer() - ); - - DateTimeZone zone = DateTimes.inferTzFromString("UTC"); - // format for putting into events - DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"); - - DateTime dt = new DateTime(zone); // timestamp to put on events - dtFirst = dt; // timestamp of 1st event - dtLast = dt; // timestamp of last event - - // these are used to compute the expected aggregations - int added = 0; - int num_events = 10; - - // send data to kafka - for (int i = 0; i < num_events; i++) { - added += i; - // construct the event to send - String event = StringUtils.format( - event_template, - event_fmt.print(dt), i, 0, i - ); - LOG.info("sending event: [%s]", event); - try { - // Send event to kafka - producer.send(new ProducerRecord(TOPIC_NAME, event)).get(); - } - catch (Exception ioe) { - throw Throwables.propagate(ioe); - } - - dtLast = dt; - dt = new DateTime(zone); - } - - producer.close(); - - String indexerSpec; - - // replace temp strings in indexer file - try { - LOG.info("indexerFile name: [%s]", INDEXER_FILE); - - Properties consumerProperties = new Properties(); - consumerProperties.put("zookeeper.connect", config.getZookeeperInternalHosts()); - consumerProperties.put("zookeeper.connection.timeout.ms", "15000"); - consumerProperties.put("zookeeper.sync.time.ms", "5000"); - consumerProperties.put("group.id", Long.toString(System.currentTimeMillis())); - consumerProperties.put("fetch.message.max.bytes", "1048586"); - consumerProperties.put("auto.offset.reset", "smallest"); - consumerProperties.put("auto.commit.enable", "false"); - - addFilteredProperties(consumerProperties); - - indexerSpec = getTaskAsString(INDEXER_FILE); - indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", fullDatasourceName); - indexerSpec = StringUtils.replace(indexerSpec, "%%TOPIC%%", TOPIC_NAME); - indexerSpec = StringUtils.replace(indexerSpec, "%%COUNT%%", Integer.toString(num_events)); - String consumerPropertiesJson = jsonMapper.writeValueAsString(consumerProperties); - indexerSpec = StringUtils.replace(indexerSpec, "%%CONSUMER_PROPERTIES%%", consumerPropertiesJson); - - LOG.info("indexerFile: [%s]\n", indexerSpec); - } - catch (Exception e) { - // log here so the message will appear in the console output - LOG.error("could not read indexer file [%s]", INDEXER_FILE); - throw new ISE(e, "could not read indexer file [%s]", INDEXER_FILE); - } - - // start indexing task - taskID = indexer.submitTask(indexerSpec); - LOG.info("-------------SUBMITTED TASK"); - - // wait for the task to finish - indexer.waitUntilTaskCompletes(taskID, 10000, 60); - - // wait for segments to be handed off - try { - RetryUtil.retryUntil( - new Callable() - { - @Override - public Boolean call() - { - return coordinator.areSegmentsLoaded(fullDatasourceName); - } - }, - true, - 10000, - 30, - "Real-time generated segments loaded" - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - LOG.info("segments are present"); - segmentsExist = true; - - // put the timestamps into the query structure - String queryResponseTemplate; - InputStream is = ITKafkaTest.class.getResourceAsStream(QUERIES_FILE); - if (null == is) { - throw new ISE("could not open query file: %s", QUERIES_FILE); - } - - try { - queryResponseTemplate = IOUtils.toString(is, "UTF-8"); - } - catch (IOException e) { - throw new ISE(e, "could not read query file: %s", QUERIES_FILE); - } - - String queryStr = queryResponseTemplate; - queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName); - // time boundary - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)); - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst)); - // time series - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst)); - String queryEnd = INTERVAL_FMT.print(dtFirst.plusMinutes(MINUTES_TO_SEND + 2)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_END%%", queryEnd); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_ADDED%%", Integer.toString(added)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events)); - - // this query will probably be answered from the realtime task - try { - this.queryHelper.testQueriesFromString(queryStr, 2); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @AfterClass - public void afterClass() - { - LOG.info("teardown"); - if (config.manageKafkaTopic()) { - // delete kafka topic - AdminUtils.deleteTopic(zkUtils, TOPIC_NAME); - } - - // remove segments - if (segmentsExist) { - unloadAndKillData(fullDatasourceName); - } - } - - public void addFilteredProperties(Properties properties) - { - for (Map.Entry entry : config.getProperties().entrySet()) { - if (entry.getKey().startsWith(testPropertyPrefix)) { - properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue()); - } - } - } -} - diff --git a/integration-tests/src/test/resources/indexer/kafka_index_task.json b/integration-tests/src/test/resources/indexer/kafka_index_task.json deleted file mode 100644 index 55e28c7c47a2..000000000000 --- a/integration-tests/src/test/resources/indexer/kafka_index_task.json +++ /dev/null @@ -1,68 +0,0 @@ -{ - "type" : "index_realtime", - "spec" : { - "dataSchema": { - "dataSource": "%%DATASOURCE%%", - "parser" : { - "type" : "string", - "parseSpec" : { - "format" : "json", - "timestampSpec" : { - "column" : "timestamp", - "format" : "auto" - }, - "dimensionsSpec" : { - "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"], - "dimensionExclusions" : [], - "spatialDimensions" : [] - } - } - }, - "metricsSpec": [ - { - "type": "count", - "name": "count" - }, - { - "type": "doubleSum", - "name": "added", - "fieldName": "added" - }, - { - "type": "doubleSum", - "name": "deleted", - "fieldName": "deleted" - }, - { - "type": "doubleSum", - "name": "delta", - "fieldName": "delta" - } - ], - "granularitySpec": { - "type" : "uniform", - "segmentGranularity": "MINUTE", - "queryGranularity": "NONE" - } - }, - "ioConfig" : { - "type" : "realtime", - "firehose": { - "type": "fixedCount", - "count": "%%COUNT%%", - "delegate": { - "type": "kafka-0.8", - "consumerProps": %%CONSUMER_PROPERTIES%%, - "feed": "%%TOPIC%%" - } - } - }, - "tuningConfig": { - "type" : "realtime", - "maxRowsInMemory": 500000, - "intermediatePersistPeriod": "PT3M", - "windowPeriod": "PT150S", - "basePersistDirectory": "/home/y/var/druid_state/kafka_test/realtime/basePersist" - } - } -} diff --git a/pom.xml b/pom.xml index 351e735b0e53..95ca07b21e9d 100644 --- a/pom.xml +++ b/pom.xml @@ -696,7 +696,7 @@ org.slf4j slf4j-api - 1.6.4 + 1.7.25 org.roaringbitmap