From b8bbb1e5466684b1a2edd3c9fc6f7600d4da3d8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 11 Dec 2020 16:12:36 -0800 Subject: [PATCH 01/14] Introduce KafkaRecordEntity to support Kafka headers in InputFormats Today Kafka message support in streaming indexing tasks is limited to message values, and does not provide a way to expose Kafka headers, timestamps, or keys, which may be of interest to more specialized Druid input formats. For instance, Kafka headers may be used to indicate payload format/encoding or additional metadata, and timestamps are often omitted from values in Kakfa streams applications, since they are included in the record. This change proposes to introduce KafkaRecordEntity as InputEntity, which would give input formats full access to the underlying Kafka record, including headers, key, timestamps. It would also open access to low-level information such as topic, partition, offset if needed. KafkaEntity is a subclass of ByteEntity for backwards compatbility with existing input formats, and to avoid introducing unnecessary complexity for Kinesis indexing tasks. --- .../data/input/kafka/KafkaRecordEntity.java | 39 ++++++++++++++ ...ementalPublishingKafkaIndexTaskRunner.java | 13 ++--- .../druid/indexing/kafka/KafkaIndexTask.java | 5 +- .../indexing/kafka/KafkaRecordSupplier.java | 10 ++-- .../kafka/supervisor/KafkaSupervisor.java | 9 ++-- .../indexing/kafka/KafkaIndexTaskTest.java | 52 +++++++++---------- .../kafka/KafkaRecordSupplierTest.java | 26 ++++++---- .../kafka/supervisor/KafkaSupervisorTest.java | 3 +- .../indexing/kinesis/KinesisIndexTask.java | 5 +- .../kinesis/KinesisIndexTaskRunner.java | 9 ++-- .../kinesis/KinesisRecordSupplier.java | 21 ++++---- .../kinesis/supervisor/KinesisSupervisor.java | 9 ++-- .../kinesis/KinesisIndexTaskTest.java | 15 +++--- .../kinesis/KinesisRecordSupplierTest.java | 38 ++++---------- .../kinesis/KinesisSamplerSpecTest.java | 13 ++--- .../supervisor/KinesisSupervisorTest.java | 3 +- .../RecordSupplierInputSource.java | 16 +++--- .../SeekableStreamIndexTask.java | 11 ++-- .../SeekableStreamIndexTaskRunner.java | 28 +++++----- .../SeekableStreamSamplerSpec.java | 6 +-- .../seekablestream/SequenceMetadata.java | 12 ++--- .../seekablestream/StreamChunkParser.java | 14 ++--- .../common/OrderedPartitionableRecord.java | 19 ++++--- .../seekablestream/common/RecordSupplier.java | 5 +- .../supervisor/SeekableStreamSupervisor.java | 21 ++++---- .../RecordSupplierInputSourceTest.java | 11 ++-- .../SeekableStreamIndexTaskTestBase.java | 49 +++++++++++++---- .../SeekableStreamSupervisorStateTest.java | 17 +++--- 28 files changed, 275 insertions(+), 204 deletions(-) create mode 100644 extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java new file mode 100644 index 000000000000..37487a369e1e --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kafka; + +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class KafkaRecordEntity extends ByteEntity +{ + private final ConsumerRecord record; + + public KafkaRecordEntity(ConsumerRecord record) + { + super(record.value()); + this.record = record; + } + + public ConsumerRecord getRecord() + { + return record; + } +} diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index b65910e6cc91..e85e6fbb6da8 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; @@ -56,7 +57,7 @@ /** * Kafka indexing task runner supporting incremental segments publishing */ -public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner +public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner { private static final EmittingLogger log = new EmittingLogger(IncrementalPublishingKafkaIndexTaskRunner.class); private final KafkaIndexTask task; @@ -85,15 +86,15 @@ protected Long getNextStartOffset(@NotNull Long sequenceNumber) @Nonnull @Override - protected List> getRecords( - RecordSupplier recordSupplier, + protected List> getRecords( + RecordSupplier recordSupplier, TaskToolbox toolbox ) throws Exception { // Handles OffsetOutOfRangeException, which is thrown if the seeked-to // offset is not present in the topic-partition. This can happen if we're asking a task to read from data // that has not been written yet (which is totally legitimate). So let's wait for it to show up. - List> records = new ArrayList<>(); + List> records = new ArrayList<>(); try { records = recordSupplier.poll(task.getIOConfig().getPollTimeout()); } @@ -121,7 +122,7 @@ protected SeekableStreamEndSequenceNumbers deserializePartitionsF private void possiblyResetOffsetsOrWait( Map outOfRangePartitions, - RecordSupplier recordSupplier, + RecordSupplier recordSupplier, TaskToolbox taskToolbox ) throws InterruptedException, IOException { @@ -192,7 +193,7 @@ protected OrderedSequenceNumber createSequenceNumber(Long sequenceNumber) @Override protected void possiblyResetDataSourceMetadata( TaskToolbox toolbox, - RecordSupplier recordSupplier, + RecordSupplier recordSupplier, Set> assignment ) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index 7e09acf8cb89..bd98510027d5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; @@ -39,7 +40,7 @@ import java.util.Set; import java.util.stream.Collectors; -public class KafkaIndexTask extends SeekableStreamIndexTask +public class KafkaIndexTask extends SeekableStreamIndexTask { private static final String TYPE = "index_kafka"; @@ -122,7 +123,7 @@ static void assignPartitions( } @Override - protected SeekableStreamIndexTaskRunner createTaskRunner() + protected SeekableStreamIndexTaskRunner createTaskRunner() { //noinspection unchecked return new IncrementalPublishingKafkaIndexTaskRunner( diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index d26f9340bc79..13d106091a3a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; @@ -51,7 +52,7 @@ import java.util.concurrent.Callable; import java.util.stream.Collectors; -public class KafkaRecordSupplier implements RecordSupplier +public class KafkaRecordSupplier implements RecordSupplier { private final KafkaConsumer consumer; private boolean closed; @@ -119,15 +120,16 @@ public Set> getAssignment() @Nonnull @Override - public List> poll(long timeout) + public List> poll(long timeout) { - List> polledRecords = new ArrayList<>(); + List> polledRecords = new ArrayList<>(); for (ConsumerRecord record : consumer.poll(Duration.ofMillis(timeout))) { + polledRecords.add(new OrderedPartitionableRecord<>( record.topic(), record.partition(), record.offset(), - record.value() == null ? null : ImmutableList.of(record.value()) + ImmutableList.of(new KafkaRecordEntity(record)) )); } return polledRecords; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 021d91664f1b..d592f789953f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata; @@ -74,7 +75,7 @@ * tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of * Kafka offsets. */ -public class KafkaSupervisor extends SeekableStreamSupervisor +public class KafkaSupervisor extends SeekableStreamSupervisor { public static final TypeReference>> CHECKPOINTS_TYPE_REF = new TypeReference>>() @@ -121,7 +122,7 @@ public KafkaSupervisor( @Override - protected RecordSupplier setupRecordSupplier() + protected RecordSupplier setupRecordSupplier() { return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper); } @@ -199,7 +200,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( } @Override - protected List> createIndexTasks( + protected List> createIndexTasks( int replicas, String baseSequenceName, ObjectMapper sortingMapper, @@ -217,7 +218,7 @@ protected List> createIndexTasks( // Kafka index task will pick up LegacyKafkaIndexTaskRunner without the below configuration. context.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true); - List> taskList = new ArrayList<>(); + List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName); taskList.add(new KafkaIndexTask( diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index fa86c49933a6..9c282f286a38 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -199,41 +199,41 @@ public static Iterable constructorFeeder() private static List> generateRecords(String topic) { return ImmutableList.of( - new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2008", "a", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2009", "b", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2010", "c", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2011", "d", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2011", "e", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")), new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")), new ProducerRecord<>(topic, 0, null, null), - new ProducerRecord<>(topic, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "10", "notanumber", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "10", "20.0", "notanumber")), - new ProducerRecord<>(topic, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0")) + new ProducerRecord<>(topic, 0, null, jbb("2013", "f", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2049", "f", "y", "notanumber", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2049", "f", "y", "10", "notanumber", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2049", "f", "y", "10", "20.0", "notanumber")), + new ProducerRecord<>(topic, 1, null, jbb("2012", "g", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 1, null, jbb("2011", "h", "y", "10", "20.0", "1.0")) ); } private static List> generateSinglePartitionRecords(String topic) { return ImmutableList.of( - new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0")) + new ProducerRecord<>(topic, 0, null, jbb("2008", "a", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2009", "b", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2010", "c", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2011", "d", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2011", "D", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2012", "e", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2009", "B", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2008", "A", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2009", "B", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2010", "C", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2011", "D", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2011", "d", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2012", "E", "x", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb("2009", "b", "x", "10", "20.0", "1.0")) ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index ca152210f57a..31a3ac09044c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.curator.test.TestingCluster; +import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.kafka.test.TestBroker; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; @@ -32,6 +33,7 @@ import org.apache.druid.metadata.DynamicConfigProvider; import org.apache.druid.metadata.MapStringDynamicConfigProvider; import org.apache.druid.segment.TestHelper; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Deserializer; @@ -111,7 +113,7 @@ private static String getTopicName() return "topic-" + topicPosFix++; } - private List> createOrderedPartitionableRecords() + private List> createOrderedPartitionableRecords() { Map partitionToOffset = new HashMap<>(); return records.stream().map(r -> { @@ -126,7 +128,9 @@ private List> createOrderedPartitionab topic, r.partition(), offset, - r.value() == null ? null : Collections.singletonList(r.value()) + r.value() == null ? null : Collections.singletonList(new KafkaRecordEntity( + new ConsumerRecord<>(r.topic(), r.partition(), offset, r.key(), r.value()) + )) ); }).collect(Collectors.toList()); } @@ -265,9 +269,9 @@ public void testPollCustomDeserializer() throws InterruptedException, ExecutionE recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); - List> initialRecords = new ArrayList<>(createOrderedPartitionableRecords()); + List> initialRecords = new ArrayList<>(createOrderedPartitionableRecords()); - List> polledRecords = recordSupplier.poll(poll_timeout_millis); + List> polledRecords = recordSupplier.poll(poll_timeout_millis); for (int i = 0; polledRecords.size() != initialRecords.size() && i < pollRetry; i++) { polledRecords.addAll(recordSupplier.poll(poll_timeout_millis)); Thread.sleep(200); @@ -298,9 +302,9 @@ public void testPoll() throws InterruptedException, ExecutionException recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); - List> initialRecords = new ArrayList<>(createOrderedPartitionableRecords()); + List> initialRecords = new ArrayList<>(createOrderedPartitionableRecords()); - List> polledRecords = recordSupplier.poll(poll_timeout_millis); + List> polledRecords = recordSupplier.poll(poll_timeout_millis); for (int i = 0; polledRecords.size() != initialRecords.size() && i < pollRetry; i++) { polledRecords.addAll(recordSupplier.poll(poll_timeout_millis)); Thread.sleep(200); @@ -339,7 +343,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); - List> polledRecords = recordSupplier.poll(poll_timeout_millis); + List> polledRecords = recordSupplier.poll(poll_timeout_millis); for (int i = 0; polledRecords.size() != 13 && i < pollRetry; i++) { polledRecords.addAll(recordSupplier.poll(poll_timeout_millis)); Thread.sleep(200); @@ -361,7 +365,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE Thread.sleep(200); } - List> initialRecords = createOrderedPartitionableRecords(); + List> initialRecords = createOrderedPartitionableRecords(); Assert.assertEquals(records.size(), polledRecords.size()); Assert.assertEquals(partitions, recordSupplier.getAssignment()); @@ -416,9 +420,9 @@ public void testSeek() throws InterruptedException, ExecutionException recordSupplier.seek(partition0, 2L); recordSupplier.seek(partition1, 2L); - List> initialRecords = createOrderedPartitionableRecords(); + List> initialRecords = createOrderedPartitionableRecords(); - List> polledRecords = recordSupplier.poll(poll_timeout_millis); + List> polledRecords = recordSupplier.poll(poll_timeout_millis); for (int i = 0; polledRecords.size() != 11 && i < pollRetry; i++) { polledRecords.addAll(recordSupplier.poll(poll_timeout_millis)); Thread.sleep(200); @@ -457,7 +461,7 @@ public void testSeekToLatest() throws InterruptedException, ExecutionException Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition1)); recordSupplier.seekToLatest(partitions); - List> polledRecords = recordSupplier.poll(poll_timeout_millis); + List> polledRecords = recordSupplier.poll(poll_timeout_millis); Assert.assertEquals(Collections.emptyList(), polledRecords); recordSupplier.close(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 251d8376a690..1bded68998f7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -34,6 +34,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskInfoProvider; @@ -3822,7 +3823,7 @@ public TestableKafkaSupervisor( } @Override - protected RecordSupplier setupRecordSupplier() + protected RecordSupplier setupRecordSupplier() { final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); consumerConfigs.put("metadata.max.age.ms", "1"); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index 0fe57015fe29..0d82d96833b1 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -26,6 +26,7 @@ import com.google.common.base.Preconditions; import com.google.inject.name.Named; import org.apache.druid.common.aws.AWSCredentialsConfig; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; @@ -33,7 +34,7 @@ import java.util.Map; -public class KinesisIndexTask extends SeekableStreamIndexTask +public class KinesisIndexTask extends SeekableStreamIndexTask { private static final String TYPE = "index_kinesis"; @@ -63,7 +64,7 @@ public KinesisIndexTask( } @Override - protected SeekableStreamIndexTaskRunner createTaskRunner() + protected SeekableStreamIndexTaskRunner createTaskRunner() { //noinspection unchecked return new KinesisIndexTaskRunner( diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java index 2c65cd036dc0..4ab66a6e1cdb 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; @@ -48,7 +49,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; -public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner +public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner { private static final EmittingLogger log = new EmittingLogger(KinesisIndexTaskRunner.class); private static final long POLL_TIMEOUT = 100; @@ -80,8 +81,8 @@ protected String getNextStartOffset(String sequenceNumber) @Nonnull @Override - protected List> getRecords( - RecordSupplier recordSupplier, TaskToolbox toolbox + protected List> getRecords( + RecordSupplier recordSupplier, TaskToolbox toolbox ) { return recordSupplier.poll(POLL_TIMEOUT); @@ -118,7 +119,7 @@ protected OrderedSequenceNumber createSequenceNumber(String sequenceNumb @Override protected void possiblyResetDataSourceMetadata( TaskToolbox toolbox, - RecordSupplier recordSupplier, + RecordSupplier recordSupplier, Set> assignment ) { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 1c01ceb5dc75..f37a88331d96 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -48,6 +48,7 @@ import com.google.common.collect.Queues; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsUtils; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; @@ -92,7 +93,7 @@ * This class implements a local buffer for storing fetched Kinesis records. Fetching is done * in background threads. */ -public class KinesisRecordSupplier implements RecordSupplier +public class KinesisRecordSupplier implements RecordSupplier { private static final EmittingLogger log = new EmittingLogger(KinesisRecordSupplier.class); private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000; @@ -234,7 +235,7 @@ private Runnable fetchRecords() // used for retrying on InterruptedException GetRecordsResult recordsResult = null; - OrderedPartitionableRecord currRecord; + OrderedPartitionableRecord currRecord; try { @@ -267,7 +268,7 @@ private Runnable fetchRecords() // list will come back empty if there are no records for (Record kinesisRecord : recordsResult.getRecords()) { - final List data; + final List data; if (deaggregate) { @@ -282,10 +283,10 @@ private Runnable fetchRecords() ); for (Object userRecord : userRecords) { - data.add(toByteArray((ByteBuffer) getDataHandle.invoke(userRecord))); + data.add(new ByteEntity((ByteBuffer) getDataHandle.invoke(userRecord))); } } else { - data = Collections.singletonList(toByteArray(kinesisRecord.getData())); + data = Collections.singletonList(new ByteEntity(kinesisRecord.getData())); } currRecord = new OrderedPartitionableRecord<>( @@ -302,7 +303,7 @@ private Runnable fetchRecords() currRecord.getPartitionId(), currRecord.getSequenceNumber(), records.remainingCapacity(), - currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList()) + currRecord.getData().stream().map(b -> StringUtils.fromUtf8(b.getBuffer())).collect(Collectors.toList()) ); // If the buffer was full and we weren't able to add the message, grab a new stream iterator starting @@ -427,7 +428,7 @@ private long getPartitionTimeLag() private final ConcurrentMap, PartitionResource> partitionResources = new ConcurrentHashMap<>(); - private BlockingQueue> records; + private BlockingQueue> records; private final boolean backgroundFetchEnabled; private volatile boolean closed = false; @@ -636,14 +637,14 @@ public String getPosition(StreamPartition partition) @Nonnull @Override - public List> poll(long timeout) + public List> poll(long timeout) { start(); try { int expectedSize = Math.min(Math.max(records.size(), 1), maxRecordsPerPoll); - List> polledRecords = new ArrayList<>(expectedSize); + List> polledRecords = new ArrayList<>(expectedSize); Queues.drain( records, @@ -928,7 +929,7 @@ private void filterBufferAndResetBackgroundFetch(Set> pa } // filter records in buffer and only retain ones whose partition was not seeked - BlockingQueue> newQ = new LinkedBlockingQueue<>(recordBufferSize); + BlockingQueue> newQ = new LinkedBlockingQueue<>(recordBufferSize); records.stream() .filter(x -> !partitions.contains(x.getStreamPartition())) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index ba5129a3d93d..a7bc5997d655 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata; @@ -72,7 +73,7 @@ * tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of * Kinesis sequences. */ -public class KinesisSupervisor extends SeekableStreamSupervisor +public class KinesisSupervisor extends SeekableStreamSupervisor { private static final EmittingLogger log = new EmittingLogger(KinesisSupervisor.class); @@ -150,7 +151,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( } @Override - protected List> createIndexTasks( + protected List> createIndexTasks( int replicas, String baseSequenceName, ObjectMapper sortingMapper, @@ -164,7 +165,7 @@ protected List> createIndexTasks( final Map context = createBaseTaskContexts(); context.put(CHECKPOINTS_CTX_KEY, checkpoints); - List> taskList = new ArrayList<>(); + List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName); taskList.add(new KinesisIndexTask( @@ -182,7 +183,7 @@ protected List> createIndexTasks( @Override - protected RecordSupplier setupRecordSupplier() throws RuntimeException + protected RecordSupplier setupRecordSupplier() throws RuntimeException { KinesisSupervisorIOConfig ioConfig = spec.getIoConfig(); KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 2c32ece55e77..8e13236438ac 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -40,6 +40,7 @@ import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.indexing.NoopIndexingServiceClient; import org.apache.druid.common.aws.AWSCredentialsConfig; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; @@ -153,7 +154,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase private static final String SHARD_ID1 = "1"; private static final String SHARD_ID0 = "0"; private static KinesisRecordSupplier recordSupplier; - private static List> records; + private static List> records; private static ServiceEmitter emitter; @@ -249,7 +250,7 @@ public static void tearDownClass() throws Exception emitter.close(); } - private static List> generateRecords(String stream) + private static List> generateRecords(String stream) { return ImmutableList.of( new OrderedPartitionableRecord<>(stream, "1", "0", jbl("2008", "a", "y", "10", "20.0", "1.0")), @@ -267,15 +268,15 @@ private static List> generateRecords( stream, "1", "6", - Collections.singletonList(StringUtils.toUtf8("unparseable")) + Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable"))) ), new OrderedPartitionableRecord<>( stream, "1", "7", - Collections.singletonList(StringUtils.toUtf8("unparseable2")) + Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable2"))) ), - new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))), + new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}")))), new OrderedPartitionableRecord<>(stream, "1", "9", jbl("2013", "f", "y", "10", "20.0", "1.0")), new OrderedPartitionableRecord<>(stream, "1", "10", jbl("2049", "f", "y", "notanumber", "20.0", "1.0")), new OrderedPartitionableRecord<>(stream, "1", "11", jbl("2049", "f", "y", "10", "notanumber", "1.0")), @@ -285,7 +286,7 @@ private static List> generateRecords( ); } - private static List> generateSinglePartitionRecords(String stream) + private static List> generateSinglePartitionRecords(String stream) { return ImmutableList.of( new OrderedPartitionableRecord<>(stream, "1", "0", jbl("2008", "a", "y", "10", "20.0", "1.0")), @@ -2579,7 +2580,7 @@ public void testEndOfShard() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - List> eosRecord = ImmutableList.of( + List> eosRecord = ImmutableList.of( new OrderedPartitionableRecord<>(STREAM, SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER, null) ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 3cdd49176b7d..94cdec5670ba 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -34,6 +34,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; @@ -95,7 +96,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport x.getSequenceNumber(), Collections .singletonList( - toByteArray( + new ByteEntity( x.getData())) )) .collect( @@ -108,7 +109,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport x.getSequenceNumber(), Collections .singletonList( - toByteArray( + new ByteEntity( x.getData())) )) .collect( @@ -238,7 +239,7 @@ private static GetRecordsRequest generateGetRecordsReq(String shardIterator, int } // filter out EOS markers - private static List> cleanRecords(List> records) + private static List> cleanRecords(List> records) { return records.stream() .filter(x -> !x.getSequenceNumber() @@ -312,7 +313,7 @@ public void testPoll() throws InterruptedException Thread.sleep(100); } - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); @@ -400,7 +401,7 @@ public void testPollWithKinesisInternalFailure() throws InterruptedException Thread.sleep(100); } - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); @@ -479,7 +480,7 @@ public void testSeek() Thread.sleep(100); } - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); @@ -644,7 +645,7 @@ public void testPollAfterSeek() Thread.sleep(100); } - OrderedPartitionableRecord firstRecord = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); + OrderedPartitionableRecord firstRecord = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); Assert.assertEquals( ALL_RECORDS.get(7), @@ -662,7 +663,7 @@ public void testPollAfterSeek() } - OrderedPartitionableRecord record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); + OrderedPartitionableRecord record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); Assert.assertEquals(ALL_RECORDS.get(9), record2); // only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS @@ -737,7 +738,7 @@ public void testPollDeaggregate() throws InterruptedException Thread.sleep(100); } - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); @@ -886,23 +887,4 @@ public void getPartitionTimeLag() throws InterruptedException } verifyAll(); } - - /** - * Returns an array with the content between the position and limit of "buffer". This may be the buffer's backing - * array itself. Does not modify position or limit of the buffer. - */ - private static byte[] toByteArray(final ByteBuffer buffer) - { - if (buffer.hasArray() - && buffer.arrayOffset() == 0 - && buffer.position() == 0 - && buffer.array().length == buffer.limit()) { - return buffer.array(); - } else { - final byte[] retVal = new byte[buffer.remaining()]; - buffer.duplicate().get(retVal); - return retVal; - } - } - } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index dadc6c7a944d..9f469bfe1661 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.JsonInputFormat; @@ -89,7 +90,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport private final KinesisRecordSupplier recordSupplier = mock(KinesisRecordSupplier.class); - private static List> generateRecords(String stream) + private static List> generateRecords(String stream) { return ImmutableList.of( new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), @@ -105,9 +106,9 @@ private static List> generateRecords( stream, "1", "6", - Collections.singletonList(StringUtils.toUtf8("unparseable")) + Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable"))) ), - new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))) + new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}")))) ); } @@ -274,10 +275,10 @@ public void testSample() throws Exception Assert.assertFalse(it.hasNext()); } - private static List jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1) + private static List jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1) { try { - return Collections.singletonList(new ObjectMapper().writeValueAsBytes( + return Collections.singletonList(new ByteEntity(new ObjectMapper().writeValueAsBytes( ImmutableMap.builder() .put("timestamp", ts) .put("dim1", dim1) @@ -286,7 +287,7 @@ private static List jb(String ts, String dim1, String dim2, String dimLo .put("dimFloat", dimFloat) .put("met1", met1) .build() - )); + ))); } catch (Exception e) { throw new RuntimeException(e); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 3d57a6bbe1b2..77c30d4fa383 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; @@ -5268,7 +5269,7 @@ protected String generateSequenceName( } @Override - protected RecordSupplier setupRecordSupplier() + protected RecordSupplier setupRecordSupplier() { return supervisorRecordSupplier; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java index d523f2d806df..c38757150780 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java @@ -43,15 +43,15 @@ * {@link org.apache.druid.data.input.InputSource} wrapping {@link RecordSupplier}. It will fetch data via * RecordSupplier and convert it into {@link ByteEntity}. See {@link #createEntityIterator}. */ -public class RecordSupplierInputSource extends AbstractInputSource +public class RecordSupplierInputSource extends AbstractInputSource { private final String topic; - private final RecordSupplier recordSupplier; + private final RecordSupplier recordSupplier; private final boolean useEarliestOffset; public RecordSupplierInputSource( String topic, - RecordSupplier recordSupplier, + RecordSupplier recordSupplier, boolean useEarliestOffset ) { @@ -66,7 +66,7 @@ public RecordSupplierInputSource( } } - private void assignAndSeek(RecordSupplier recordSupplier) + private void assignAndSeek(RecordSupplier recordSupplier) throws InterruptedException { final Set> partitions = recordSupplier @@ -113,15 +113,15 @@ protected InputSourceReader formattableReader( } /** - * Returns an iterator converting each byte array from RecordSupplier into a ByteEntity. Note that the + * Returns an iterator converting a RecordSupplier into an iterator of ByteEntity. Note that the * returned iterator will be blocked until the RecordSupplier gives any data. */ CloseableIterator createEntityIterator() { return new CloseableIterator() { - private Iterator> recordIterator; - private Iterator bytesIterator; + private Iterator> recordIterator; + private Iterator bytesIterator; private volatile boolean closed; private void waitNextIteratorIfNecessary() @@ -146,7 +146,7 @@ public boolean hasNext() @Override public InputEntity next() { - return new ByteEntity(bytesIterator.next()); + return bytesIterator.next(); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 54292d8c9fb8..d3436ea3ef34 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -25,6 +25,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; @@ -57,7 +58,7 @@ import java.util.Map; -public abstract class SeekableStreamIndexTask +public abstract class SeekableStreamIndexTask extends AbstractTask implements ChatHandler { public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15; @@ -72,7 +73,7 @@ public abstract class SeekableStreamIndexTask> runnerSupplier; + private final Supplier> runnerSupplier; @MonotonicNonNull protected AuthorizerMapper authorizerMapper; @@ -259,9 +260,9 @@ public boolean withinMinMaxRecordTime(final InputRow row) return !beforeMinimumMessageTime && !afterMaximumMessageTime; } - protected abstract SeekableStreamIndexTaskRunner createTaskRunner(); + protected abstract SeekableStreamIndexTaskRunner createTaskRunner(); - protected abstract RecordSupplier newTaskRecordSupplier(); + protected abstract RecordSupplier newTaskRecordSupplier(); @VisibleForTesting public Appenderator getAppenderator() @@ -270,7 +271,7 @@ public Appenderator getAppenderator() } @VisibleForTesting - public SeekableStreamIndexTaskRunner getRunner() + public SeekableStreamIndexTaskRunner getRunner() { return runnerSupplier.get(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 2a2bf2d7f05f..5ca48523648a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -40,6 +40,7 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.LookupNodeService; @@ -134,7 +135,7 @@ * @param Partition Number Type * @param Sequence Number Type */ -public abstract class SeekableStreamIndexTaskRunner implements ChatHandler +public abstract class SeekableStreamIndexTaskRunner implements ChatHandler { public enum Status { @@ -193,7 +194,7 @@ public enum Status protected final Lock pollRetryLock = new ReentrantLock(); protected final Condition isAwaitingRetry = pollRetryLock.newCondition(); - private final SeekableStreamIndexTask task; + private final SeekableStreamIndexTask task; private final SeekableStreamIndexTaskIOConfig ioConfig; private final SeekableStreamIndexTaskTuningConfig tuningConfig; private final InputRowSchema inputRowSchema; @@ -232,7 +233,7 @@ public enum Status private volatile Throwable backgroundThreadException; public SeekableStreamIndexTaskRunner( - final SeekableStreamIndexTask task, + final SeekableStreamIndexTask task, @Nullable final InputRowParser parser, final AuthorizerMapper authorizerMapper, final LockGranularity lockGranularityToUse @@ -370,7 +371,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception ); // Now we can initialize StreamChunkReader with the given toolbox. - final StreamChunkParser parser = new StreamChunkParser( + final StreamChunkParser parser = new StreamChunkParser( this.parser, inputFormat, inputRowSchema, @@ -412,7 +413,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception ); Throwable caughtExceptionOuter = null; - try (final RecordSupplier recordSupplier = task.newTaskRecordSupplier()) { + try (final RecordSupplier recordSupplier = task.newTaskRecordSupplier()) { if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) { toolbox.getDataSegmentServerAnnouncer().announce(); @@ -600,7 +601,7 @@ public void run() // calling getRecord() ensures that exceptions specific to kafka/kinesis like OffsetOutOfRangeException // are handled in the subclasses. - List> records = getRecords( + List> records = getRecords( recordSupplier, toolbox ); @@ -609,7 +610,7 @@ public void run() stillReading = !assignment.isEmpty(); SequenceMetadata sequenceToCheckpoint = null; - for (OrderedPartitionableRecord record : records) { + for (OrderedPartitionableRecord record : records) { final boolean shouldProcess = verifyRecordInRange(record.getPartitionId(), record.getSequenceNumber()); log.trace( @@ -621,8 +622,7 @@ public void run() ); if (shouldProcess) { - final List valueBytess = record.getData(); - final List rows = parser.parse(valueBytess); + final List rows = parser.parse(record.getData()); boolean isPersistRequired = false; final SequenceMetadata sequenceToUse = sequences @@ -1112,7 +1112,7 @@ private void maybePersistAndPublishSequences(Supplier committerSuppli } private Set> assignPartitions( - RecordSupplier recordSupplier + RecordSupplier recordSupplier ) { final Set> assignment = new HashSet<>(); @@ -1230,7 +1230,7 @@ private boolean isMoreToReadAfterReadingRecord( } private void seekToStartingSequence( - RecordSupplier recordSupplier, + RecordSupplier recordSupplier, Set> partitions ) throws InterruptedException { @@ -1914,8 +1914,8 @@ protected abstract SeekableStreamEndSequenceNumbers> getRecords( - RecordSupplier recordSupplier, + protected abstract List> getRecords( + RecordSupplier recordSupplier, TaskToolbox toolbox ) throws Exception; @@ -1945,7 +1945,7 @@ protected abstract SeekableStreamDataSourceMetadata recordSupplier, + RecordSupplier recordSupplier, Set> assignment ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java index 5c0447704b9b..9175f9510770 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java @@ -54,7 +54,7 @@ import java.util.Map; import java.util.stream.Stream; -public abstract class SeekableStreamSamplerSpec implements SamplerSpec +public abstract class SeekableStreamSamplerSpec implements SamplerSpec { static final long POLL_TIMEOUT_MS = 100; @@ -106,7 +106,7 @@ public SamplerResponse sample() return inputSourceSampler.sample(inputSource, inputFormat, dataSchema, samplerConfig); } - protected abstract RecordSupplier createRecordSupplier(); + protected abstract RecordSupplier createRecordSupplier(); private class SeekableStreamSamplerFirehoseFactory implements FiniteFirehoseFactory { @@ -159,7 +159,7 @@ protected SeekableStreamSamplerFirehose(InputRowParser parser) ((StringInputRowParser) parser).startFileFromBeginning(); } - RecordSupplierInputSource inputSource = new RecordSupplierInputSource<>( + RecordSupplierInputSource inputSource = new RecordSupplierInputSource<>( ioConfig.getStream(), createRecordSupplier(), ioConfig.isUseEarliestSequenceNumber() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index af926203d6c2..b7cff3130ba0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -183,8 +183,8 @@ boolean isOpen() } boolean canHandle( - SeekableStreamIndexTaskRunner runner, - OrderedPartitionableRecord record + SeekableStreamIndexTaskRunner runner, + OrderedPartitionableRecord record ) { lock.lock(); @@ -240,7 +240,7 @@ public String toString() } Supplier getCommitterSupplier( - SeekableStreamIndexTaskRunner runner, + SeekableStreamIndexTaskRunner runner, String stream, Map lastPersistedOffsets ) @@ -304,7 +304,7 @@ public void run() } TransactionalSegmentPublisher createPublisher( - SeekableStreamIndexTaskRunner runner, + SeekableStreamIndexTaskRunner runner, TaskToolbox toolbox, boolean useTransaction ) @@ -319,12 +319,12 @@ TransactionalSegmentPublisher createPublisher( private class SequenceMetadataTransactionalSegmentPublisher implements TransactionalSegmentPublisher { - private final SeekableStreamIndexTaskRunner runner; + private final SeekableStreamIndexTaskRunner runner; private final TaskToolbox toolbox; private final boolean useTransaction; public SequenceMetadataTransactionalSegmentPublisher( - SeekableStreamIndexTaskRunner runner, + SeekableStreamIndexTaskRunner runner, TaskToolbox toolbox, boolean useTransaction ) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java index c4089c9d9cf3..3926ae99e773 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java @@ -46,7 +46,7 @@ * Abstraction for parsing stream data which internally uses {@link org.apache.druid.data.input.InputEntityReader} * or {@link InputRowParser}. This class will be useful untill we remove the deprecated InputRowParser. */ -class StreamChunkParser +class StreamChunkParser { @Nullable private final InputRowParser parser; @@ -90,7 +90,7 @@ class StreamChunkParser this.parseExceptionHandler = parseExceptionHandler; } - List parse(@Nullable List streamChunk) throws IOException + List parse(@Nullable List streamChunk) throws IOException { if (streamChunk == null || streamChunk.isEmpty()) { rowIngestionMeters.incrementThrownAway(); @@ -104,11 +104,11 @@ List parse(@Nullable List streamChunk) throws IOException } } - private List parseWithParser(InputRowParser parser, List valueBytess) + private List parseWithParser(InputRowParser parser, List valueBytess) { final FluentIterable iterable = FluentIterable .from(valueBytess) - .transformAndConcat(bytes -> parser.parseBatch(ByteBuffer.wrap(bytes))); + .transformAndConcat(bytes -> parser.parseBatch(bytes.getBuffer())); final FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( CloseableIterators.withEmptyBaggage(iterable.iterator()), @@ -121,12 +121,12 @@ private List parseWithParser(InputRowParser parser, List parseWithInputFormat( SettableByteEntityReader byteEntityReader, - List valueBytess + List valueBytess ) throws IOException { final List rows = new ArrayList<>(); - for (byte[] valueBytes : valueBytess) { - byteEntityReader.setEntity(new ByteEntity(valueBytes)); + for (ByteEntity valueBytes : valueBytess) { + byteEntityReader.setEntity(valueBytes); try (FilteringCloseableInputRowIterator rowIterator = new FilteringCloseableInputRowIterator( byteEntityReader.read(), rowFilter, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java index 4dd653e760ff..96f3063bb6da 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java @@ -20,10 +20,9 @@ package org.apache.druid.indexing.seekablestream.common; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.impl.ByteEntity; import javax.validation.constraints.NotNull; -import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -35,18 +34,18 @@ * @param partition id * @param sequence number */ -public class OrderedPartitionableRecord +public class OrderedPartitionableRecord { private final String stream; private final PartitionIdType partitionId; private final SequenceOffsetType sequenceNumber; - private final List data; + private final List data; public OrderedPartitionableRecord( String stream, PartitionIdType partitionId, SequenceOffsetType sequenceNumber, - List data + List data ) { Preconditions.checkNotNull(stream, "stream"); @@ -55,7 +54,7 @@ public OrderedPartitionableRecord( this.stream = stream; this.partitionId = partitionId; this.sequenceNumber = sequenceNumber; - this.data = data == null ? ImmutableList.of() : data; + this.data = data; } public String getStream() @@ -74,7 +73,7 @@ public SequenceOffsetType getSequenceNumber() } @NotNull - public List getData() + public List getData() { return data; } @@ -94,14 +93,14 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - OrderedPartitionableRecord that = (OrderedPartitionableRecord) o; + OrderedPartitionableRecord that = (OrderedPartitionableRecord) o; if (data.size() != that.data.size()) { return false; } for (int i = 0; i < data.size(); i++) { - if (!Arrays.equals(data.get(i), that.data.get(i))) { + if (data.get(i).getBuffer().equals(that.data.get(i).getBuffer())) { return false; } } @@ -114,7 +113,7 @@ public boolean equals(Object o) @Override public int hashCode() { - final int hashOfData = data.stream().map(Arrays::hashCode).collect(Collectors.toList()).hashCode(); + final int hashOfData = data.stream().map(Objects::hashCode).collect(Collectors.toList()).hashCode(); return Objects.hash(stream, partitionId, sequenceNumber, hashOfData); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java index 31c343b9ac63..64a5eebbe237 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.seekablestream.common; import com.google.common.annotations.Beta; +import org.apache.druid.data.input.impl.ByteEntity; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; @@ -36,7 +37,7 @@ * @param Sequence Number Type */ @Beta -public interface RecordSupplier extends Closeable +public interface RecordSupplier extends Closeable { /** * assigns the given partitions to this RecordSupplier @@ -82,7 +83,7 @@ public interface RecordSupplier extends Clo * @return record */ @NotNull - List> poll(long timeout); + List> poll(long timeout); /** * get the latest sequence number in stream diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 1a4f35fcb05d..10ab73ba8de3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -40,6 +40,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -130,7 +131,7 @@ * @param the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type * @param the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers */ -public abstract class SeekableStreamSupervisor implements Supervisor +public abstract class SeekableStreamSupervisor implements Supervisor { public static final String CHECKPOINTS_CTX_KEY = "checkpoints"; @@ -490,7 +491,7 @@ boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup) private int initRetryCounter = 0; private volatile DateTime firstRunTime; private volatile DateTime earlyStopTime = null; - protected volatile RecordSupplier recordSupplier; + protected volatile RecordSupplier recordSupplier; private volatile boolean started = false; private volatile boolean stopped = false; private volatile boolean lifecycleStarted = false; @@ -1321,7 +1322,7 @@ private void discoverTasks() throws ExecutionException, InterruptedException, Ti taskCount++; @SuppressWarnings("unchecked") - final SeekableStreamIndexTask seekableStreamIndexTask = (SeekableStreamIndexTask) task; + final SeekableStreamIndexTask seekableStreamIndexTask = (SeekableStreamIndexTask) task; final String taskId = task.getId(); // Check if the task has any inactive partitions. If so, terminate the task. Even if some of the @@ -1777,8 +1778,8 @@ private void verifySameSequenceNameForAllTasksInGroup(int groupId) return false; } @SuppressWarnings("unchecked") - SeekableStreamIndexTask task = - (SeekableStreamIndexTask) taskOptional.get(); + SeekableStreamIndexTask task = + (SeekableStreamIndexTask) taskOptional.get(); return task.getIOConfig().getBaseSequenceName(); }) .allMatch(taskSeqName -> taskSeqName.equals(taskGroupSequenceName)); @@ -1818,8 +1819,8 @@ public boolean isTaskCurrent(int taskGroupId, String taskId) } @SuppressWarnings("unchecked") - SeekableStreamIndexTask task = - (SeekableStreamIndexTask) taskOptional.get(); + SeekableStreamIndexTask task = + (SeekableStreamIndexTask) taskOptional.get(); // We recompute the sequence name hash for the supervisor's own configuration and compare this to the hash created // by rehashing the task's sequence name using the most up-to-date class definitions of tuning config and @@ -3123,7 +3124,7 @@ private void createTasksForGroup(int groupId, int replicas) ioConfig ); - List> taskList = createIndexTasks( + List> taskList = createIndexTasks( replicas, group.baseSequenceName, sortingMapper, @@ -3352,7 +3353,7 @@ protected abstract SeekableStreamIndexTaskIOConfig createTaskIoConfig( * * @throws JsonProcessingException */ - protected abstract List> createIndexTasks( + protected abstract List> createIndexTasks( int replicas, String baseSequenceName, ObjectMapper sortingMapper, @@ -3459,7 +3460,7 @@ protected abstract Map getTimeLagPerPartition( * * @return specific instance of Kafka/Kinesis RecordSupplier */ - protected abstract RecordSupplier setupRecordSupplier(); + protected abstract RecordSupplier setupRecordSupplier(); /** * creates a specific instance of Kafka/Kinesis Supervisor Report Payload diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java index 10d6bf857a45..fba76878d7ea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; @@ -97,7 +98,7 @@ public void testRead() throws IOException Assert.assertTrue(supplier.isClosed()); } - private static class RandomCsvSupplier implements RecordSupplier + private static class RandomCsvSupplier implements RecordSupplier { private static final int STR_LEN = 8; @@ -134,7 +135,7 @@ public void seekToLatest(Set> streamPartitions) @NotNull @Override - public List> poll(long timeout) + public List> poll(long timeout) { final long sleepTime = random.nextInt((int) timeout); try { @@ -147,12 +148,12 @@ public List> poll(long timeout) return Collections.emptyList(); } else { final int numRecords = random.nextInt(8); // can be 0 - final List> records = new ArrayList<>(numRecords); + final List> records = new ArrayList<>(numRecords); for (int i = 0; i < numRecords; i++) { final int partitionId = random.nextInt(partitionToOffset.size()); final int offset = partitionToOffset.get(partitionId); final int numBytes = random.nextInt(3); // can be 0 - final List bytes = IntStream + final List bytes = IntStream .range(0, numBytes) .mapToObj(j -> { final List columns = new ArrayList<>(NUM_COLS); @@ -160,7 +161,7 @@ public List> poll(long timeout) for (int k = 0; k < NUM_COLS - 1; k++) { columns.add(RandomStringUtils.random(STR_LEN, true, false)); } - return StringUtils.toUtf8(String.join(",", columns)); + return new ByteEntity(StringUtils.toUtf8(String.join(",", columns))); }) .collect(Collectors.toList()); records.add(new OrderedPartitionableRecord<>("topic", partitionId, offset, bytes)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index eff6609cd106..30af2801e0f9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -33,6 +33,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.JSONParseSpec; @@ -86,6 +87,7 @@ import org.joda.time.Interval; import org.junit.Assert; +import javax.annotation.Nonnull; import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; @@ -191,7 +193,7 @@ public SeekableStreamIndexTaskTestBase( this.lockGranularity = lockGranularity; } - protected static byte[] jb( + protected static ByteEntity jb( String timestamp, String dim1, String dim2, @@ -203,7 +205,7 @@ protected static byte[] jb( return jb(false, timestamp, dim1, dim2, dimLong, dimFloat, met1); } - protected static byte[] jb(boolean prettyPrint, + protected static byte[] jbb( String timestamp, String dim1, String dim2, @@ -212,16 +214,43 @@ protected static byte[] jb(boolean prettyPrint, String met1 ) { - return StringUtils.toUtf8(toJsonString(prettyPrint, - timestamp, - dim1, - dim2, - dimLong, - dimFloat, - met1)); + return jbb(false, timestamp, dim1, dim2, dimLong, dimFloat, met1); } - protected static List jbl( + protected static ByteEntity jb(boolean prettyPrint, + String timestamp, + String dim1, + String dim2, + String dimLong, + String dimFloat, + String met1 + ) + { + return new ByteEntity(jbb(prettyPrint, timestamp, dim1, dim2, dimLong, dimFloat, met1)); + } + + protected static byte[] jbb( + boolean prettyPrint, + String timestamp, + String dim1, + String dim2, + String dimLong, + String dimFloat, + String met1 + ) + { + return StringUtils.toUtf8(toJsonString( + prettyPrint, + timestamp, + dim1, + dim2, + dimLong, + dimFloat, + met1 + )); + } + + protected static List jbl( String timestamp, String dim1, String dim2, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 2cefebd6a227..d0b810737d3d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; @@ -110,7 +111,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport private SeekableStreamIndexTaskClientFactory taskClientFactory; private SeekableStreamSupervisorSpec spec; private SeekableStreamIndexTaskClient indexTaskClient; - private RecordSupplier recordSupplier; + private RecordSupplier recordSupplier; private RowIngestionMetersFactory rowIngestionMetersFactory; private SupervisorStateManagerConfig supervisorConfig; @@ -128,7 +129,7 @@ public void setupTest() taskClientFactory = createMock(SeekableStreamIndexTaskClientFactory.class); spec = createMock(SeekableStreamSupervisorSpec.class); indexTaskClient = createMock(SeekableStreamIndexTaskClient.class); - recordSupplier = (RecordSupplier) createMock(RecordSupplier.class); + recordSupplier = (RecordSupplier) createMock(RecordSupplier.class); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -918,7 +919,7 @@ public String toString() }; } - private class TestSeekableStreamIndexTask extends SeekableStreamIndexTask + private class TestSeekableStreamIndexTask extends SeekableStreamIndexTask { public TestSeekableStreamIndexTask( String id, @@ -942,13 +943,13 @@ public TestSeekableStreamIndexTask( } @Override - protected SeekableStreamIndexTaskRunner createTaskRunner() + protected SeekableStreamIndexTaskRunner createTaskRunner() { return null; } @Override - protected RecordSupplier newTaskRecordSupplier() + protected RecordSupplier newTaskRecordSupplier() { return recordSupplier; } @@ -960,7 +961,7 @@ public String getType() } } - private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor + private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor { private BaseTestSeekableStreamSupervisor() { @@ -1030,7 +1031,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( } @Override - protected List> createIndexTasks( + protected List> createIndexTasks( int replicas, String baseSequenceName, ObjectMapper sortingMapper, @@ -1104,7 +1105,7 @@ protected Map getTimeLagPerPartition(Map currentOf } @Override - protected RecordSupplier setupRecordSupplier() + protected RecordSupplier setupRecordSupplier() { return SeekableStreamSupervisorStateTest.this.recordSupplier; } From b687f1624a92dc9eaf7e8769219a09b6c204d0b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Tue, 5 Jan 2021 20:27:27 -0800 Subject: [PATCH 02/14] fix type bounds in tests --- .../sampler/InputSourceSamplerTest.java | 35 ++++++++++--------- .../seekablestream/StreamChunkParserTest.java | 13 +++---- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index e02ea9b4ff25..c792b2ac39ed 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -27,6 +27,7 @@ import org.apache.druid.data.input.FirehoseFactoryToInputSourceAdaptor; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DelimitedParseSpec; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -1474,10 +1475,10 @@ private static boolean equalsStringOrInteger(Object val1, Object val2) return false; } - private static class TestRecordSupplier implements RecordSupplier + private static class TestRecordSupplier implements RecordSupplier { private final List jsonList; - private Set partitions; + private final Set partitions; private boolean polled; public TestRecordSupplier(List jsonList) @@ -1488,34 +1489,34 @@ public TestRecordSupplier(List jsonList) } @Override - public void assign(Set set) + public void assign(Set> set) { } @Override - public void seek(StreamPartition partition, Object sequenceNumber) + public void seek(StreamPartition partition, Long sequenceNumber) { } @Override - public void seekToEarliest(Set set) + public void seekToEarliest(Set> set) { } @Override - public void seekToLatest(Set set) + public void seekToLatest(Set> set) { } @Override - public Collection getAssignment() + public Collection> getAssignment() { return null; } @Nonnull @Override - public List poll(long timeout) + public List> poll(long timeout) { if (polled) { try { @@ -1528,35 +1529,37 @@ public List poll(long timeout) polled = true; return jsonList.stream() - .map(jsonText -> new OrderedPartitionableRecord("topic", - 0, - 0, - Collections.singletonList(StringUtils.toUtf8(jsonText)))) + .map(jsonText -> new OrderedPartitionableRecord<>( + "topic", + 0, + 0L, + Collections.singletonList(new ByteEntity(StringUtils.toUtf8(jsonText))) + )) .collect(Collectors.toList()); } @Nullable @Override - public Object getLatestSequenceNumber(StreamPartition partition) + public Long getLatestSequenceNumber(StreamPartition partition) { return null; } @Nullable @Override - public Object getEarliestSequenceNumber(StreamPartition partition) + public Long getEarliestSequenceNumber(StreamPartition partition) { return null; } @Override - public Object getPosition(StreamPartition partition) + public Long getPosition(StreamPartition partition) { return null; } @Override - public Set getPartitionIds(String stream) + public Set getPartitionIds(String stream) { return partitions; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java index 294619a08937..7e0ece635dc1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java @@ -24,6 +24,7 @@ import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.JSONParseSpec; @@ -82,7 +83,7 @@ public void testWithParserAndNullInputformatParseProperly() throws IOException ), StringUtils.UTF8_STRING ); - final StreamChunkParser chunkParser = new StreamChunkParser( + final StreamChunkParser chunkParser = new StreamChunkParser<>( parser, // Set nulls for all parameters below since inputFormat will be never used. null, @@ -100,7 +101,7 @@ public void testWithParserAndNullInputformatParseProperly() throws IOException public void testWithNullParserAndInputformatParseProperly() throws IOException { final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap(), null); - final StreamChunkParser chunkParser = new StreamChunkParser( + final StreamChunkParser chunkParser = new StreamChunkParser<>( null, inputFormat, new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()), @@ -118,7 +119,7 @@ public void testWithNullParserAndNullInputformatFailToCreateParser() { expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("Either parser or inputFormat should be set"); - final StreamChunkParser chunkParser = new StreamChunkParser( + final StreamChunkParser chunkParser = new StreamChunkParser<>( null, null, null, @@ -148,7 +149,7 @@ public void testBothParserAndInputFormatParseProperlyUsingInputFormat() throws I JSONPathSpec.DEFAULT, Collections.emptyMap() ); - final StreamChunkParser chunkParser = new StreamChunkParser( + final StreamChunkParser chunkParser = new StreamChunkParser<>( parser, inputFormat, new InputRowSchema(TIMESTAMP_SPEC, DimensionsSpec.EMPTY, Collections.emptyList()), @@ -162,10 +163,10 @@ public void testBothParserAndInputFormatParseProperlyUsingInputFormat() throws I Assert.assertTrue(inputFormat.props.used); } - private void parseAndAssertResult(StreamChunkParser chunkParser) throws IOException + private void parseAndAssertResult(StreamChunkParser chunkParser) throws IOException { final String json = "{\"timestamp\": \"2020-01-01\", \"dim\": \"val\", \"met\": \"val2\"}"; - List parsedRows = chunkParser.parse(Collections.singletonList(json.getBytes(StringUtils.UTF8_STRING))); + List parsedRows = chunkParser.parse(Collections.singletonList(new ByteEntity(json.getBytes(StringUtils.UTF8_STRING)))); Assert.assertEquals(1, parsedRows.size()); InputRow row = parsedRows.get(0); Assert.assertEquals(DateTimes.of("2020-01-01"), row.getTimestamp()); From 809de314f629ac573bcbc4c35a35d8c99fb5e26a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 6 Jan 2021 10:00:06 -0800 Subject: [PATCH 03/14] fix checkstyle --- .../indexing/seekablestream/SeekableStreamIndexTaskTestBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 30af2801e0f9..e2f226a42769 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -87,7 +87,6 @@ import org.joda.time.Interval; import org.junit.Assert; -import javax.annotation.Nonnull; import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; From 0b0911e0c29143d9d9d0ee62e0b2b606c3cd8739 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 6 Jan 2021 14:58:44 -0800 Subject: [PATCH 04/14] restore null message handling --- .../org/apache/druid/indexing/kafka/KafkaRecordSupplier.java | 2 +- .../seekablestream/common/OrderedPartitionableRecord.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 13d106091a3a..1397cddf22dc 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -129,7 +129,7 @@ public List> poll(l record.topic(), record.partition(), record.offset(), - ImmutableList.of(new KafkaRecordEntity(record)) + record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record)) )); } return polledRecords; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java index 96f3063bb6da..17e5b013df2a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.seekablestream.common; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.impl.ByteEntity; import javax.validation.constraints.NotNull; @@ -54,7 +55,7 @@ public OrderedPartitionableRecord( this.stream = stream; this.partitionId = partitionId; this.sequenceNumber = sequenceNumber; - this.data = data; + this.data = data == null ? ImmutableList.of() : data; } public String getStream() From a42d71d72e49c00c0d1a058e8aa13d0e8024dd51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 6 Jan 2021 15:02:53 -0800 Subject: [PATCH 05/14] remove unused method --- .../kinesis/KinesisRecordSupplier.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index f37a88331d96..9a71196987de 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -115,24 +115,6 @@ private static boolean isServiceExceptionRecoverable(AmazonServiceException ex) return isIOException || isTimeout || isInternalError; } - /** - * Returns an array with the content between the position and limit of "buffer". This may be the buffer's backing - * array itself. Does not modify position or limit of the buffer. - */ - private static byte[] toByteArray(final ByteBuffer buffer) - { - if (buffer.hasArray() - && buffer.arrayOffset() == 0 - && buffer.position() == 0 - && buffer.array().length == buffer.limit()) { - return buffer.array(); - } else { - final byte[] retVal = new byte[buffer.remaining()]; - buffer.duplicate().get(retVal); - return retVal; - } - } - /** * Catch any exception and wrap it in a {@link StreamException} */ From 36e51e07f4b8233cf6397bb01cf1f5b55cde4e15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 6 Jan 2021 18:11:25 -0800 Subject: [PATCH 06/14] fix kinesis tests reading records multiple times --- .../kinesis/KinesisIndexTaskTest.java | 90 +++++++++++-------- 1 file changed, 51 insertions(+), 39 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 8e13236438ac..288846961324 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -154,7 +154,6 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase private static final String SHARD_ID1 = "1"; private static final String SHARD_ID0 = "0"; private static KinesisRecordSupplier recordSupplier; - private static List> records; private static ServiceEmitter emitter; @@ -217,7 +216,6 @@ public void setupTest() throws IOException, InterruptedException maxParseExceptions = null; maxSavedParseExceptions = null; doHandoff = true; - records = generateRecords(STREAM); reportsFile = File.createTempFile("KinesisIndexTaskTestReports-" + System.currentTimeMillis(), "json"); maxRecordsPerPoll = 1; @@ -250,6 +248,20 @@ public static void tearDownClass() throws Exception emitter.close(); } + // records can only be read once, hence we generate fresh records every time + private static List> generateRecords(int start) { + final List> records = generateRecords(STREAM); + return records.subList(start, records.size()); + } + + private static List> generateRecords(int start, int end) { + return generateRecords(STREAM).subList(start, end); + } + + private List> generateSinglePartitionRecords(int start, int end) { + return generateSinglePartitionRecords(STREAM).subList(start, end); + } + private static List> generateRecords(String stream) { return ImmutableList.of( @@ -318,7 +330,7 @@ public void testRunAfterDataInserted() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 5)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 5)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -385,7 +397,7 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 5)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 5)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -460,7 +472,7 @@ public void testRunBeforeDataInserted() throws Exception EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(Collections.emptyList()) .times(5) - .andReturn(records.subList(13, 15)) + .andReturn(generateRecords(13, 15)) .once(); recordSupplier.close(); @@ -532,9 +544,9 @@ public void testIncrementalHandOff() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(0, 5)) + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(0, 5)) .once() - .andReturn(records.subList(4, records.size())) + .andReturn(generateRecords(4)) .once(); recordSupplier.close(); @@ -644,11 +656,11 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(0, 3)) + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(0, 3)) .once() - .andReturn(records.subList(2, 10)) + .andReturn(generateRecords(2, 10)) .once() - .andReturn(records.subList(9, 11)); + .andReturn(generateRecords(9, 11)); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -775,7 +787,7 @@ public void testRunWithMinimumMessageTime() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(0, 13)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(0, 13)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -845,7 +857,7 @@ public void testRunWithMaximumMessageTime() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(0, 13)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(0, 13)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -917,7 +929,7 @@ public void testRunWithTransformSpec() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(0, 13)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(0, 13)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -995,7 +1007,7 @@ public void testRunOnSingletonRange() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 3)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 3)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -1054,7 +1066,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -1124,7 +1136,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -1197,7 +1209,7 @@ public void testReportParseExceptions() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -1257,7 +1269,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -1357,7 +1369,7 @@ public void testMultipleParseExceptionsFailure() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -1440,7 +1452,9 @@ public void testRunReplicas() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).times(2); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())) + .andReturn(generateRecords(2, 13)).once() + .andReturn(generateRecords(2, 13)).once(); recordSupplier.close(); EasyMock.expectLastCall().times(2); @@ -1531,9 +1545,9 @@ public void testRunConflicting() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)) + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)) .once() - .andReturn(records.subList(3, 13)) + .andReturn(generateRecords(3, 13)) .once(); recordSupplier.close(); @@ -1623,9 +1637,9 @@ public void testRunConflictingWithoutTransactions() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)) + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)) .once() - .andReturn(records.subList(3, 13)) + .andReturn(generateRecords(3, 13)) .once(); recordSupplier.close(); @@ -1716,7 +1730,7 @@ public void testRunOneTaskTwoPartitions() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, records.size())).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2)).once(); recordSupplier.close(); EasyMock.expectLastCall().once(); @@ -1792,9 +1806,9 @@ public void testRunTwoTasksTwoPartitions() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)) + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)) .once() - .andReturn(records.subList(13, 15)) + .andReturn(generateRecords(13, 15)) .once(); recordSupplier.close(); @@ -1885,7 +1899,7 @@ public void testRestore() throws Exception EasyMock.expectLastCall(); recordSupplier.seek(streamPartition, "2"); EasyMock.expectLastCall(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 4)) + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 4)) .once() .andReturn(Collections.emptyList()) .anyTimes(); @@ -1936,7 +1950,7 @@ public void testRestore() throws Exception EasyMock.expectLastCall(); recordSupplier.seek(streamPartition, "3"); EasyMock.expectLastCall(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(3, 6)).once(); + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(3, 6)).once(); recordSupplier.assign(ImmutableSet.of()); EasyMock.expectLastCall(); recordSupplier.close(); @@ -2006,7 +2020,7 @@ public void testRestoreAfterPersistingSequences() throws Exception { maxRowsPerSegment = 2; maxRecordsPerPoll = 1; - records = generateSinglePartitionRecords(STREAM); + List> records = generateSinglePartitionRecords(STREAM); recordSupplier.assign(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); @@ -2163,7 +2177,7 @@ public void testRunWithPauseAndResume() throws Exception EasyMock.expectLastCall(); recordSupplier.seek(streamPartition, "2"); EasyMock.expectLastCall(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 5)) + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 5)) .once() .andReturn(Collections.emptyList()) .anyTimes(); @@ -2274,7 +2288,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)) + EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)) .once(); recordSupplier.close(); @@ -2341,8 +2355,6 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception @Test(timeout = 5000L) public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception { - records = generateSinglePartitionRecords(STREAM); - final String baseSequenceName = "sequence0"; // as soon as any segment has more than one record, incremental publishing should happen maxRowsPerSegment = 2; @@ -2353,9 +2365,9 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception EasyMock.expect(recordSupplier1.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); recordSupplier1.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier1.poll(EasyMock.anyLong())).andReturn(records.subList(0, 5)) + EasyMock.expect(recordSupplier1.poll(EasyMock.anyLong())).andReturn(generateSinglePartitionRecords(0, 5)) .once() - .andReturn(records.subList(4, 10)) + .andReturn(generateSinglePartitionRecords(4, 10)) .once(); recordSupplier1.close(); EasyMock.expectLastCall().once(); @@ -2365,9 +2377,9 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception EasyMock.expect(recordSupplier2.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); recordSupplier2.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(recordSupplier2.poll(EasyMock.anyLong())).andReturn(records.subList(0, 5)) + EasyMock.expect(recordSupplier2.poll(EasyMock.anyLong())).andReturn(generateSinglePartitionRecords(0, 5)) .once() - .andReturn(records.subList(4, 10)) + .andReturn(generateSinglePartitionRecords(4, 10)) .once(); recordSupplier2.close(); EasyMock.expectLastCall().once(); @@ -2585,7 +2597,7 @@ public void testEndOfShard() throws Exception ); EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())) - .andReturn(records.subList(2, 5)).once() + .andReturn(generateRecords(2, 5)).once() .andReturn(eosRecord).once(); recordSupplier.close(); From 876d45521f6fc512a28a90e1f14a3d5b2eac766f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Wed, 6 Jan 2021 18:12:40 -0800 Subject: [PATCH 07/14] ensure logging does not modify buffer position --- .../kinesis/KinesisRecordSupplier.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 9a71196987de..e90b464b45bb 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -279,14 +279,22 @@ private Runnable fetchRecords() ); - log.trace( - "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s", - currRecord.getStream(), - currRecord.getPartitionId(), - currRecord.getSequenceNumber(), - records.remainingCapacity(), - currRecord.getData().stream().map(b -> StringUtils.fromUtf8(b.getBuffer())).collect(Collectors.toList()) - ); + if (log.isTraceEnabled()) { + log.trace( + "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s", + currRecord.getStream(), + currRecord.getPartitionId(), + currRecord.getSequenceNumber(), + records.remainingCapacity(), + currRecord.getData() + .stream() + .map(b -> StringUtils.fromUtf8( + // duplicate buffer to avoid changing its position when logging + b.getBuffer().duplicate()) + ) + .collect(Collectors.toList()) + ); + } // If the buffer was full and we weren't able to add the message, grab a new stream iterator starting // from this message and back off for a bit to let the buffer drain before retrying. From 99c37e07711aaf2735fe2ba2d3acc59b11532a9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 7 Jan 2021 09:15:00 -0800 Subject: [PATCH 08/14] make checkstyle happy --- .../druid/indexing/kinesis/KinesisIndexTaskTest.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 288846961324..37d699a39eb4 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -249,16 +249,19 @@ public static void tearDownClass() throws Exception } // records can only be read once, hence we generate fresh records every time - private static List> generateRecords(int start) { + private static List> generateRecords(int start) + { final List> records = generateRecords(STREAM); return records.subList(start, records.size()); } - private static List> generateRecords(int start, int end) { + private static List> generateRecords(int start, int end) + { return generateRecords(STREAM).subList(start, end); } - private List> generateSinglePartitionRecords(int start, int end) { + private List> generateSinglePartitionRecords(int start, int end) + { return generateSinglePartitionRecords(STREAM).subList(start, end); } @@ -2020,7 +2023,7 @@ public void testRestoreAfterPersistingSequences() throws Exception { maxRowsPerSegment = 2; maxRecordsPerPoll = 1; - List> records = generateSinglePartitionRecords(STREAM); + List> records = generateSinglePartitionRecords(STREAM); recordSupplier.assign(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); From abd1425191e41a3c098f6e96bfe3d6d78f0d3d3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 7 Jan 2021 09:19:07 -0800 Subject: [PATCH 09/14] add javadocs --- .../druid/data/input/kafka/KafkaRecordEntity.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java index 37487a369e1e..46ed91e2782e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java @@ -19,9 +19,21 @@ package org.apache.druid.data.input.kafka; +import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.indexing.kafka.KafkaRecordSupplier; import org.apache.kafka.clients.consumer.ConsumerRecord; +/** + * A {@link ByteEntity} generated by {@link KafkaRecordSupplier} and fed to any {@link InputFormat} used by Kafka + * indexing tasks. + *

+ * It can be used as a regular ByteEntity, in which case the Kafka record value is returned, but the {@link #getRecord} + * method also allows Kafka-aware {@link InputFormat} implementations to read the full Kafka record, including headers, + * key, and timestamp. + *

+ * This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions. + */ public class KafkaRecordEntity extends ByteEntity { private final ConsumerRecord record; From f35f7d488f1d547178ada279abe37686600fdd31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 7 Jan 2021 14:40:53 -0800 Subject: [PATCH 10/14] clarify the behavior of some key methods --- .../java/org/apache/druid/data/input/InputEntity.java | 3 +++ .../apache/druid/data/input/kafka/KafkaRecordEntity.java | 2 ++ .../common/OrderedPartitionableRecord.java | 9 +++++++++ 3 files changed, 14 insertions(+) diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java index 71f14d95a3c3..679af074e00b 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java @@ -66,6 +66,9 @@ interface CleanableFile extends Closeable * Opens an {@link InputStream} on the input entity directly. * This is the basic way to read the given entity. * + * The behavior of this method is only defined fort the first call to open(). + * The behvior of subsequent calls is undefined and may vary between implementations. + * * @see #fetch */ InputStream open() throws IOException; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java index 46ed91e2782e..41c2c0a03258 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java @@ -32,6 +32,8 @@ * method also allows Kafka-aware {@link InputFormat} implementations to read the full Kafka record, including headers, * key, and timestamp. *

+ * NOTE: Any records with null values will be skipped, even if they contain non-null keys, or headers + *

* This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions. */ public class KafkaRecordEntity extends ByteEntity diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java index 17e5b013df2a..74be638d41a4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java @@ -73,6 +73,15 @@ public SequenceOffsetType getSequenceNumber() return sequenceNumber; } + /** + * Returns a list of ByteEntities representing this record + * + * This method will always return the same list of ByteEntity instances. + * Since each ByteEntity can only be read once (unless care is taking to reset its buffer positions), it + * is not advised to call this method multiple times, or iterating over the list more than once. + * + * @return the list of ByteEntity object for this record + */ @NotNull public List getData() { From 025822b95df722f398cfc9c8e10dea1a76183a05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 7 Jan 2021 18:24:02 -0800 Subject: [PATCH 11/14] add kafka input format test --- .../indexing/kafka/KafkaIndexTaskTest.java | 216 +++++++++++++++++- 1 file changed, 207 insertions(+), 9 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 9c282f286a38..7ff620b1def8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -19,10 +19,14 @@ package org.apache.druid.indexing.kafka; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -36,6 +40,19 @@ import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.indexing.NoopIndexingServiceClient; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.FloatDimensionSchema; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.discovery.DataNodeService; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; @@ -74,7 +91,9 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -91,6 +110,9 @@ import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQueryConfig; @@ -107,6 +129,7 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.incremental.RowIngestionMetersTotals; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusher; @@ -123,6 +146,7 @@ import org.apache.druid.server.security.AuthTestUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.After; @@ -139,6 +163,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -153,6 +178,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; @SuppressWarnings("unchecked") @RunWith(Parameterized.class) @@ -160,14 +186,35 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase { private static final Logger log = new Logger(KafkaIndexTaskTest.class); private static final long POLL_RETRY_MS = 100; + private static final Iterable

SAMPLE_HEADERS = ImmutableList.of(new Header() + { + @Override + public String key() + { + return "encoding"; + } + + @Override + public byte[] value() + { + return "application/json".getBytes(StandardCharsets.UTF_8); + } + }); private static TestingCluster zkServer; private static TestBroker kafkaServer; private static ServiceEmitter emitter; private static int topicPostfix; + static final Module TEST_MODULE = new SimpleModule("kafkaTestModule").registerSubtypes( + new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat") + ); + static { - new KafkaIndexTaskModule().getJacksonModules().forEach(OBJECT_MAPPER::registerModule); + Stream.concat( + new KafkaIndexTaskModule().getJacksonModules().stream(), + Stream.of(TEST_MODULE) + ).forEach(OBJECT_MAPPER::registerModule); } @Parameterized.Parameters(name = "{0}") @@ -199,11 +246,11 @@ public static Iterable constructorFeeder() private static List> generateRecords(String topic) { return ImmutableList.of( - new ProducerRecord<>(topic, 0, null, jbb("2008", "a", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jbb("2009", "b", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jbb("2010", "c", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jbb("2011", "d", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(topic, 0, null, jbb("2011", "e", "y", "10", "20.0", "1.0")), + new ProducerRecord(topic, 0, null, jbb("2008", "a", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS), + new ProducerRecord(topic, 0, null, jbb("2009", "b", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS), + new ProducerRecord(topic, 0, null, jbb("2010", "c", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS), + new ProducerRecord(topic, 0, null, jbb("2011", "d", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS), + new ProducerRecord(topic, 0, null, jbb("2011", "e", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS), new ProducerRecord<>(topic, 0, null, jbb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")), new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")), @@ -1113,6 +1160,87 @@ public void testRunWithTransformSpec() throws Exception Assert.assertEquals(ImmutableList.of("bb"), readSegmentColumn("dim1t", publishedDescriptors.get(0))); } + @Test(timeout = 60_000L) + public void testKafkaRecordEntityInputFormat() throws Exception + { + // Insert data + insertData(Iterables.limit(records, 3)); + + final KafkaIndexTask task = createTask( + null, + new DataSchema( + "test_ds", + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim1t"), + new StringDimensionSchema("dim2"), + new LongDimensionSchema("dimLong"), + new FloatDimensionSchema("dimFloat"), + new StringDimensionSchema("kafka.topic"), + new LongDimensionSchema("kafka.offset"), + new StringDimensionSchema("kafka.header.encoding") + ), + null, + null + ), + new AggregatorFactory[]{ + new DoubleSumAggregatorFactory("met1sum", "met1"), + new CountAggregatorFactory("rows") + }, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + null + ), + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), + new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + new TestKafkaInputFormat(INPUT_FORMAT) + ) + ); + Assert.assertTrue(task.supportsQueries()); + + final ListenableFuture future = runTask(task); + + while (countEvents(task) != 3) { + Thread.sleep(25); + } + + Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + + final QuerySegmentSpec interval = OBJECT_MAPPER.readValue( + "\"2008/2012\"", QuerySegmentSpec.class + ); + ListscanResultValues = scanData(task, interval); + //verify that there are no records indexed in the rollbacked time period + Assert.assertEquals(3, Iterables.size(scanResultValues)); + + int i = 0; + for (ScanResultValue result : scanResultValues) { + final Map event = ((List>) result.getEvents()).get(0); + Assert.assertEquals((long)i++, event.get("kafka.offset")); + Assert.assertEquals(topic, event.get("kafka.topic")); + Assert.assertEquals("application/json", event.get("kafka.header.encoding")); + } + // insert remaining data + insertData(Iterables.skip(records, 3)); + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getProcessed()); + Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable()); + Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); + } + @Test(timeout = 60_000L) public void testRunOnNothing() throws Exception { @@ -2492,6 +2620,11 @@ private List scanData(final Task task, QuerySegmentSpec spec) } private void insertData() throws ExecutionException, InterruptedException + { + insertData(records); + } + + private void insertData(Iterable> records) throws ExecutionException, InterruptedException { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { kafkaProducer.initTransactions(); @@ -2629,9 +2762,12 @@ private void makeToolboxFactory() throws IOException final TestUtils testUtils = new TestUtils(); rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory(); final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); + for (Module module : new KafkaIndexTaskModule().getJacksonModules()) { objectMapper.registerModule(module); } + objectMapper.registerModule(TEST_MODULE); + final TaskConfig taskConfig = new TaskConfig( new File(directory, "baseDir").getPath(), new File(directory, "baseTaskDir").getPath(), @@ -2791,15 +2927,15 @@ public void testMultipleLinesJSONText() throws Exception "{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }" + "{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }"; - ProducerRecord[] producerRecords = new ProducerRecord[]{ + ProducerRecord[] producerRecords = new ProducerRecord[]{ // pretty formatted - new ProducerRecord<>(topic, 0, null, jb(true, "2049", "d1", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jbb(true, "2049", "d1", "y", "10", "20.0", "1.0")), //well-formed new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(wellformed)), //ill-formed new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(illformed)), //a well-formed record after ill-formed to demonstrate that the ill-formed can be successfully skipped - new ProducerRecord<>(topic, 0, null, jb(true, "2049", "d7", "y", "10", "20.0", "1.0")) + new ProducerRecord<>(topic, 0, null, jbb(true, "2049", "d7", "y", "10", "20.0", "1.0")) }; for (ProducerRecord record : producerRecords) { kafkaProducer.send(record).get(); @@ -2847,4 +2983,66 @@ public void testMultipleLinesJSONText() throws Exception newDataSchemaMetadata() ); } + + public static class TestKafkaInputFormat implements InputFormat + { + final InputFormat baseInputFormat; + + @JsonCreator + public TestKafkaInputFormat(@JsonProperty("baseInputFormat") InputFormat baseInputFormat) + { + this.baseInputFormat = baseInputFormat; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader( + InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory + ) + { + final KafkaRecordEntity recordEntity = (KafkaRecordEntity) source; + final InputEntityReader delegate = baseInputFormat.createReader(inputRowSchema, recordEntity, temporaryDirectory); + return new InputEntityReader() + { + @Override + public CloseableIterator read() throws IOException + { + return delegate.read().map( + r -> { + MapBasedInputRow row = (MapBasedInputRow) r; + final Map event = new HashMap<>(row.getEvent()); + event.put("kafka.offset", recordEntity.getRecord().offset()); + event.put("kafka.topic", recordEntity.getRecord().topic()); + event.put( + "kafka.header.encoding", + new String( + recordEntity.getRecord().headers().lastHeader("encoding").value(), + StandardCharsets.UTF_8 + ) + ); + + return new MapBasedInputRow(row.getTimestamp(), row.getDimensions(), event); + } + ); + } + + @Override + public CloseableIterator sample() throws IOException + { + return delegate.sample(); + } + }; + } + + @JsonProperty + public InputFormat getBaseInputFormat() + { + return baseInputFormat; + } + } } From 680709bfa6661ce3c9617c1b7a4c7336f117e5de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Thu, 7 Jan 2021 18:36:23 -0800 Subject: [PATCH 12/14] more checkstyle happiness --- .../apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 7ff620b1def8..3ca1c0ba4a67 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1218,14 +1218,14 @@ public void testKafkaRecordEntityInputFormat() throws Exception final QuerySegmentSpec interval = OBJECT_MAPPER.readValue( "\"2008/2012\"", QuerySegmentSpec.class ); - ListscanResultValues = scanData(task, interval); + List scanResultValues = scanData(task, interval); //verify that there are no records indexed in the rollbacked time period Assert.assertEquals(3, Iterables.size(scanResultValues)); int i = 0; for (ScanResultValue result : scanResultValues) { final Map event = ((List>) result.getEvents()).get(0); - Assert.assertEquals((long)i++, event.get("kafka.offset")); + Assert.assertEquals((long) i++, event.get("kafka.offset")); Assert.assertEquals(topic, event.get("kafka.topic")); Assert.assertEquals("application/json", event.get("kafka.header.encoding")); } @@ -3001,9 +3001,7 @@ public boolean isSplittable() } @Override - public InputEntityReader createReader( - InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory - ) + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { final KafkaRecordEntity recordEntity = (KafkaRecordEntity) source; final InputEntityReader delegate = baseInputFormat.createReader(inputRowSchema, recordEntity, temporaryDirectory); From 9ef22560ccf4d1eeda2b8a0f1bbb19fb4b1c4187 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 8 Jan 2021 00:22:35 -0800 Subject: [PATCH 13/14] fix OrderedPartitionableRecord equality --- .../seekablestream/common/OrderedPartitionableRecord.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java index 74be638d41a4..5d052585ba98 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java @@ -110,7 +110,7 @@ public boolean equals(Object o) } for (int i = 0; i < data.size(); i++) { - if (data.get(i).getBuffer().equals(that.data.get(i).getBuffer())) { + if (!data.get(i).getBuffer().equals(that.data.get(i).getBuffer())) { return false; } } @@ -123,7 +123,7 @@ public boolean equals(Object o) @Override public int hashCode() { - final int hashOfData = data.stream().map(Objects::hashCode).collect(Collectors.toList()).hashCode(); + final int hashOfData = data.stream().map(e -> e.getBuffer().hashCode()).collect(Collectors.toList()).hashCode(); return Objects.hash(stream, partitionId, sequenceNumber, hashOfData); } } From c70418d6a5143e313c591b1fb9a491496fab2e7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Fri, 8 Jan 2021 00:23:44 -0800 Subject: [PATCH 14/14] fix typo --- core/src/main/java/org/apache/druid/data/input/InputEntity.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java index 679af074e00b..70fd4d2d5f09 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java @@ -67,7 +67,7 @@ interface CleanableFile extends Closeable * This is the basic way to read the given entity. * * The behavior of this method is only defined fort the first call to open(). - * The behvior of subsequent calls is undefined and may vary between implementations. + * The behavior of subsequent calls is undefined and may vary between implementations. * * @see #fetch */