diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java
index 71f14d95a3c3..70fd4d2d5f09 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java
@@ -66,6 +66,9 @@ interface CleanableFile extends Closeable
* Opens an {@link InputStream} on the input entity directly.
* This is the basic way to read the given entity.
*
+ * The behavior of this method is only defined fort the first call to open().
+ * The behavior of subsequent calls is undefined and may vary between implementations.
+ *
* @see #fetch
*/
InputStream open() throws IOException;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
new file mode 100644
index 000000000000..41c2c0a03258
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * A {@link ByteEntity} generated by {@link KafkaRecordSupplier} and fed to any {@link InputFormat} used by Kafka
+ * indexing tasks.
+ *
+ * It can be used as a regular ByteEntity, in which case the Kafka record value is returned, but the {@link #getRecord}
+ * method also allows Kafka-aware {@link InputFormat} implementations to read the full Kafka record, including headers,
+ * key, and timestamp.
+ *
+ * NOTE: Any records with null values will be skipped, even if they contain non-null keys, or headers
+ *
+ * This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions.
+ */
+public class KafkaRecordEntity extends ByteEntity
+{
+ private final ConsumerRecord record;
+
+ public KafkaRecordEntity(ConsumerRecord record)
+ {
+ super(record.value());
+ this.record = record;
+ }
+
+ public ConsumerRecord getRecord()
+ {
+ return record;
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index b65910e6cc91..e85e6fbb6da8 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
@@ -56,7 +57,7 @@
/**
* Kafka indexing task runner supporting incremental segments publishing
*/
-public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner
+public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner
{
private static final EmittingLogger log = new EmittingLogger(IncrementalPublishingKafkaIndexTaskRunner.class);
private final KafkaIndexTask task;
@@ -85,15 +86,15 @@ protected Long getNextStartOffset(@NotNull Long sequenceNumber)
@Nonnull
@Override
- protected List> getRecords(
- RecordSupplier recordSupplier,
+ protected List> getRecords(
+ RecordSupplier recordSupplier,
TaskToolbox toolbox
) throws Exception
{
// Handles OffsetOutOfRangeException, which is thrown if the seeked-to
// offset is not present in the topic-partition. This can happen if we're asking a task to read from data
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
- List> records = new ArrayList<>();
+ List> records = new ArrayList<>();
try {
records = recordSupplier.poll(task.getIOConfig().getPollTimeout());
}
@@ -121,7 +122,7 @@ protected SeekableStreamEndSequenceNumbers deserializePartitionsF
private void possiblyResetOffsetsOrWait(
Map outOfRangePartitions,
- RecordSupplier recordSupplier,
+ RecordSupplier recordSupplier,
TaskToolbox taskToolbox
) throws InterruptedException, IOException
{
@@ -192,7 +193,7 @@ protected OrderedSequenceNumber createSequenceNumber(Long sequenceNumber)
@Override
protected void possiblyResetDataSourceMetadata(
TaskToolbox toolbox,
- RecordSupplier recordSupplier,
+ RecordSupplier recordSupplier,
Set> assignment
)
{
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 7e09acf8cb89..bd98510027d5 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -25,6 +25,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
@@ -39,7 +40,7 @@
import java.util.Set;
import java.util.stream.Collectors;
-public class KafkaIndexTask extends SeekableStreamIndexTask
+public class KafkaIndexTask extends SeekableStreamIndexTask
{
private static final String TYPE = "index_kafka";
@@ -122,7 +123,7 @@ static void assignPartitions(
}
@Override
- protected SeekableStreamIndexTaskRunner createTaskRunner()
+ protected SeekableStreamIndexTaskRunner createTaskRunner()
{
//noinspection unchecked
return new IncrementalPublishingKafkaIndexTaskRunner(
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index d26f9340bc79..1397cddf22dc 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -51,7 +52,7 @@
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
-public class KafkaRecordSupplier implements RecordSupplier
+public class KafkaRecordSupplier implements RecordSupplier
{
private final KafkaConsumer consumer;
private boolean closed;
@@ -119,15 +120,16 @@ public Set> getAssignment()
@Nonnull
@Override
- public List> poll(long timeout)
+ public List> poll(long timeout)
{
- List> polledRecords = new ArrayList<>();
+ List> polledRecords = new ArrayList<>();
for (ConsumerRecord record : consumer.poll(Duration.ofMillis(timeout))) {
+
polledRecords.add(new OrderedPartitionableRecord<>(
record.topic(),
record.partition(),
record.offset(),
- record.value() == null ? null : ImmutableList.of(record.value())
+ record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record))
));
}
return polledRecords;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 021d91664f1b..d592f789953f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
@@ -74,7 +75,7 @@
* tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of
* Kafka offsets.
*/
-public class KafkaSupervisor extends SeekableStreamSupervisor
+public class KafkaSupervisor extends SeekableStreamSupervisor
{
public static final TypeReference>> CHECKPOINTS_TYPE_REF =
new TypeReference>>()
@@ -121,7 +122,7 @@ public KafkaSupervisor(
@Override
- protected RecordSupplier setupRecordSupplier()
+ protected RecordSupplier setupRecordSupplier()
{
return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper);
}
@@ -199,7 +200,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
}
@Override
- protected List> createIndexTasks(
+ protected List> createIndexTasks(
int replicas,
String baseSequenceName,
ObjectMapper sortingMapper,
@@ -217,7 +218,7 @@ protected List> createIndexTasks(
// Kafka index task will pick up LegacyKafkaIndexTaskRunner without the below configuration.
context.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true);
- List> taskList = new ArrayList<>();
+ List> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName);
taskList.add(new KafkaIndexTask(
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index fa86c49933a6..3ca1c0ba4a67 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -19,10 +19,14 @@
package org.apache.druid.indexing.kafka;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -36,6 +40,19 @@
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
@@ -74,7 +91,9 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -91,6 +110,9 @@
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryConfig;
@@ -107,6 +129,7 @@
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
@@ -123,6 +146,7 @@
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
@@ -139,6 +163,7 @@
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -153,6 +178,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
@SuppressWarnings("unchecked")
@RunWith(Parameterized.class)
@@ -160,14 +186,35 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
{
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
private static final long POLL_RETRY_MS = 100;
+ private static final Iterable SAMPLE_HEADERS = ImmutableList.of(new Header()
+ {
+ @Override
+ public String key()
+ {
+ return "encoding";
+ }
+
+ @Override
+ public byte[] value()
+ {
+ return "application/json".getBytes(StandardCharsets.UTF_8);
+ }
+ });
private static TestingCluster zkServer;
private static TestBroker kafkaServer;
private static ServiceEmitter emitter;
private static int topicPostfix;
+ static final Module TEST_MODULE = new SimpleModule("kafkaTestModule").registerSubtypes(
+ new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat")
+ );
+
static {
- new KafkaIndexTaskModule().getJacksonModules().forEach(OBJECT_MAPPER::registerModule);
+ Stream.concat(
+ new KafkaIndexTaskModule().getJacksonModules().stream(),
+ Stream.of(TEST_MODULE)
+ ).forEach(OBJECT_MAPPER::registerModule);
}
@Parameterized.Parameters(name = "{0}")
@@ -199,41 +246,41 @@ public static Iterable constructorFeeder()
private static List> generateRecords(String topic)
{
return ImmutableList.of(
- new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+ new ProducerRecord(topic, 0, null, jbb("2008", "a", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS),
+ new ProducerRecord(topic, 0, null, jbb("2009", "b", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS),
+ new ProducerRecord(topic, 0, null, jbb("2010", "c", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS),
+ new ProducerRecord(topic, 0, null, jbb("2011", "d", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS),
+ new ProducerRecord(topic, 0, null, jbb("2011", "e", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS),
+ new ProducerRecord<>(topic, 0, null, jbb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")),
new ProducerRecord<>(topic, 0, null, null),
- new ProducerRecord<>(topic, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "10", "notanumber", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "10", "20.0", "notanumber")),
- new ProducerRecord<>(topic, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0"))
+ new ProducerRecord<>(topic, 0, null, jbb("2013", "f", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2049", "f", "y", "notanumber", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2049", "f", "y", "10", "notanumber", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2049", "f", "y", "10", "20.0", "notanumber")),
+ new ProducerRecord<>(topic, 1, null, jbb("2012", "g", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 1, null, jbb("2011", "h", "y", "10", "20.0", "1.0"))
);
}
private static List> generateSinglePartitionRecords(String topic)
{
return ImmutableList.of(
- new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0"))
+ new ProducerRecord<>(topic, 0, null, jbb("2008", "a", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2009", "b", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2010", "c", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2011", "d", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2011", "D", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2012", "e", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2009", "B", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2008", "A", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2009", "B", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2010", "C", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2011", "D", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2011", "d", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2012", "E", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb("2009", "b", "x", "10", "20.0", "1.0"))
);
}
@@ -1113,6 +1160,87 @@ public void testRunWithTransformSpec() throws Exception
Assert.assertEquals(ImmutableList.of("bb"), readSegmentColumn("dim1t", publishedDescriptors.get(0)));
}
+ @Test(timeout = 60_000L)
+ public void testKafkaRecordEntityInputFormat() throws Exception
+ {
+ // Insert data
+ insertData(Iterables.limit(records, 3));
+
+ final KafkaIndexTask task = createTask(
+ null,
+ new DataSchema(
+ "test_ds",
+ new TimestampSpec("timestamp", "iso", null),
+ new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim1"),
+ new StringDimensionSchema("dim1t"),
+ new StringDimensionSchema("dim2"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat"),
+ new StringDimensionSchema("kafka.topic"),
+ new LongDimensionSchema("kafka.offset"),
+ new StringDimensionSchema("kafka.header.encoding")
+ ),
+ null,
+ null
+ ),
+ new AggregatorFactory[]{
+ new DoubleSumAggregatorFactory("met1sum", "met1"),
+ new CountAggregatorFactory("rows")
+ },
+ new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
+ null
+ ),
+ new KafkaIndexTaskIOConfig(
+ 0,
+ "sequence0",
+ new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()),
+ new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L)),
+ kafkaServer.consumerProperties(),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ true,
+ null,
+ null,
+ new TestKafkaInputFormat(INPUT_FORMAT)
+ )
+ );
+ Assert.assertTrue(task.supportsQueries());
+
+ final ListenableFuture future = runTask(task);
+
+ while (countEvents(task) != 3) {
+ Thread.sleep(25);
+ }
+
+ Assert.assertEquals(Status.READING, task.getRunner().getStatus());
+
+ final QuerySegmentSpec interval = OBJECT_MAPPER.readValue(
+ "\"2008/2012\"", QuerySegmentSpec.class
+ );
+ List scanResultValues = scanData(task, interval);
+ //verify that there are no records indexed in the rollbacked time period
+ Assert.assertEquals(3, Iterables.size(scanResultValues));
+
+ int i = 0;
+ for (ScanResultValue result : scanResultValues) {
+ final Map event = ((List>) result.getEvents()).get(0);
+ Assert.assertEquals((long) i++, event.get("kafka.offset"));
+ Assert.assertEquals(topic, event.get("kafka.topic"));
+ Assert.assertEquals("application/json", event.get("kafka.header.encoding"));
+ }
+ // insert remaining data
+ insertData(Iterables.skip(records, 3));
+
+ // Wait for task to exit
+ Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+
+ // Check metrics
+ Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getProcessed());
+ Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
+ Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
+ }
+
@Test(timeout = 60_000L)
public void testRunOnNothing() throws Exception
{
@@ -2492,6 +2620,11 @@ private List scanData(final Task task, QuerySegmentSpec spec)
}
private void insertData() throws ExecutionException, InterruptedException
+ {
+ insertData(records);
+ }
+
+ private void insertData(Iterable> records) throws ExecutionException, InterruptedException
{
try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
@@ -2629,9 +2762,12 @@ private void makeToolboxFactory() throws IOException
final TestUtils testUtils = new TestUtils();
rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
+
for (Module module : new KafkaIndexTaskModule().getJacksonModules()) {
objectMapper.registerModule(module);
}
+ objectMapper.registerModule(TEST_MODULE);
+
final TaskConfig taskConfig = new TaskConfig(
new File(directory, "baseDir").getPath(),
new File(directory, "baseTaskDir").getPath(),
@@ -2791,15 +2927,15 @@ public void testMultipleLinesJSONText() throws Exception
"{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }" +
"{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }";
- ProducerRecord[] producerRecords = new ProducerRecord[]{
+ ProducerRecord[] producerRecords = new ProducerRecord[]{
// pretty formatted
- new ProducerRecord<>(topic, 0, null, jb(true, "2049", "d1", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jbb(true, "2049", "d1", "y", "10", "20.0", "1.0")),
//well-formed
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(wellformed)),
//ill-formed
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(illformed)),
//a well-formed record after ill-formed to demonstrate that the ill-formed can be successfully skipped
- new ProducerRecord<>(topic, 0, null, jb(true, "2049", "d7", "y", "10", "20.0", "1.0"))
+ new ProducerRecord<>(topic, 0, null, jbb(true, "2049", "d7", "y", "10", "20.0", "1.0"))
};
for (ProducerRecord record : producerRecords) {
kafkaProducer.send(record).get();
@@ -2847,4 +2983,64 @@ public void testMultipleLinesJSONText() throws Exception
newDataSchemaMetadata()
);
}
+
+ public static class TestKafkaInputFormat implements InputFormat
+ {
+ final InputFormat baseInputFormat;
+
+ @JsonCreator
+ public TestKafkaInputFormat(@JsonProperty("baseInputFormat") InputFormat baseInputFormat)
+ {
+ this.baseInputFormat = baseInputFormat;
+ }
+
+ @Override
+ public boolean isSplittable()
+ {
+ return false;
+ }
+
+ @Override
+ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
+ {
+ final KafkaRecordEntity recordEntity = (KafkaRecordEntity) source;
+ final InputEntityReader delegate = baseInputFormat.createReader(inputRowSchema, recordEntity, temporaryDirectory);
+ return new InputEntityReader()
+ {
+ @Override
+ public CloseableIterator read() throws IOException
+ {
+ return delegate.read().map(
+ r -> {
+ MapBasedInputRow row = (MapBasedInputRow) r;
+ final Map event = new HashMap<>(row.getEvent());
+ event.put("kafka.offset", recordEntity.getRecord().offset());
+ event.put("kafka.topic", recordEntity.getRecord().topic());
+ event.put(
+ "kafka.header.encoding",
+ new String(
+ recordEntity.getRecord().headers().lastHeader("encoding").value(),
+ StandardCharsets.UTF_8
+ )
+ );
+
+ return new MapBasedInputRow(row.getTimestamp(), row.getDimensions(), event);
+ }
+ );
+ }
+
+ @Override
+ public CloseableIterator sample() throws IOException
+ {
+ return delegate.sample();
+ }
+ };
+ }
+
+ @JsonProperty
+ public InputFormat getBaseInputFormat()
+ {
+ return baseInputFormat;
+ }
+ }
}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
index ca152210f57a..31a3ac09044c 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java
@@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.curator.test.TestingCluster;
+import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
@@ -32,6 +33,7 @@
import org.apache.druid.metadata.DynamicConfigProvider;
import org.apache.druid.metadata.MapStringDynamicConfigProvider;
import org.apache.druid.segment.TestHelper;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
@@ -111,7 +113,7 @@ private static String getTopicName()
return "topic-" + topicPosFix++;
}
- private List> createOrderedPartitionableRecords()
+ private List> createOrderedPartitionableRecords()
{
Map partitionToOffset = new HashMap<>();
return records.stream().map(r -> {
@@ -126,7 +128,9 @@ private List> createOrderedPartitionab
topic,
r.partition(),
offset,
- r.value() == null ? null : Collections.singletonList(r.value())
+ r.value() == null ? null : Collections.singletonList(new KafkaRecordEntity(
+ new ConsumerRecord<>(r.topic(), r.partition(), offset, r.key(), r.value())
+ ))
);
}).collect(Collectors.toList());
}
@@ -265,9 +269,9 @@ public void testPollCustomDeserializer() throws InterruptedException, ExecutionE
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
- List> initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
+ List> initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
- List> polledRecords = recordSupplier.poll(poll_timeout_millis);
+ List> polledRecords = recordSupplier.poll(poll_timeout_millis);
for (int i = 0; polledRecords.size() != initialRecords.size() && i < pollRetry; i++) {
polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
Thread.sleep(200);
@@ -298,9 +302,9 @@ public void testPoll() throws InterruptedException, ExecutionException
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
- List> initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
+ List> initialRecords = new ArrayList<>(createOrderedPartitionableRecords());
- List> polledRecords = recordSupplier.poll(poll_timeout_millis);
+ List> polledRecords = recordSupplier.poll(poll_timeout_millis);
for (int i = 0; polledRecords.size() != initialRecords.size() && i < pollRetry; i++) {
polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
Thread.sleep(200);
@@ -339,7 +343,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE
recordSupplier.assign(partitions);
recordSupplier.seekToEarliest(partitions);
- List> polledRecords = recordSupplier.poll(poll_timeout_millis);
+ List> polledRecords = recordSupplier.poll(poll_timeout_millis);
for (int i = 0; polledRecords.size() != 13 && i < pollRetry; i++) {
polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
Thread.sleep(200);
@@ -361,7 +365,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE
Thread.sleep(200);
}
- List> initialRecords = createOrderedPartitionableRecords();
+ List> initialRecords = createOrderedPartitionableRecords();
Assert.assertEquals(records.size(), polledRecords.size());
Assert.assertEquals(partitions, recordSupplier.getAssignment());
@@ -416,9 +420,9 @@ public void testSeek() throws InterruptedException, ExecutionException
recordSupplier.seek(partition0, 2L);
recordSupplier.seek(partition1, 2L);
- List> initialRecords = createOrderedPartitionableRecords();
+ List> initialRecords = createOrderedPartitionableRecords();
- List> polledRecords = recordSupplier.poll(poll_timeout_millis);
+ List> polledRecords = recordSupplier.poll(poll_timeout_millis);
for (int i = 0; polledRecords.size() != 11 && i < pollRetry; i++) {
polledRecords.addAll(recordSupplier.poll(poll_timeout_millis));
Thread.sleep(200);
@@ -457,7 +461,7 @@ public void testSeekToLatest() throws InterruptedException, ExecutionException
Assert.assertEquals(0L, (long) recordSupplier.getEarliestSequenceNumber(partition1));
recordSupplier.seekToLatest(partitions);
- List> polledRecords = recordSupplier.poll(poll_timeout_millis);
+ List> polledRecords = recordSupplier.poll(poll_timeout_millis);
Assert.assertEquals(Collections.emptyList(), polledRecords);
recordSupplier.close();
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 251d8376a690..1bded68998f7 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -34,6 +34,7 @@
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
@@ -3822,7 +3823,7 @@ public TestableKafkaSupervisor(
}
@Override
- protected RecordSupplier setupRecordSupplier()
+ protected RecordSupplier setupRecordSupplier()
{
final Map consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
consumerConfigs.put("metadata.max.age.ms", "1");
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index 0fe57015fe29..0d82d96833b1 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -26,6 +26,7 @@
import com.google.common.base.Preconditions;
import com.google.inject.name.Named;
import org.apache.druid.common.aws.AWSCredentialsConfig;
+import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
@@ -33,7 +34,7 @@
import java.util.Map;
-public class KinesisIndexTask extends SeekableStreamIndexTask
+public class KinesisIndexTask extends SeekableStreamIndexTask
{
private static final String TYPE = "index_kinesis";
@@ -63,7 +64,7 @@ public KinesisIndexTask(
}
@Override
- protected SeekableStreamIndexTaskRunner createTaskRunner()
+ protected SeekableStreamIndexTaskRunner createTaskRunner()
{
//noinspection unchecked
return new KinesisIndexTaskRunner(
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index 2c65cd036dc0..4ab66a6e1cdb 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -21,6 +21,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -48,7 +49,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
-public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner
+public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner
{
private static final EmittingLogger log = new EmittingLogger(KinesisIndexTaskRunner.class);
private static final long POLL_TIMEOUT = 100;
@@ -80,8 +81,8 @@ protected String getNextStartOffset(String sequenceNumber)
@Nonnull
@Override
- protected List> getRecords(
- RecordSupplier recordSupplier, TaskToolbox toolbox
+ protected List> getRecords(
+ RecordSupplier recordSupplier, TaskToolbox toolbox
)
{
return recordSupplier.poll(POLL_TIMEOUT);
@@ -118,7 +119,7 @@ protected OrderedSequenceNumber createSequenceNumber(String sequenceNumb
@Override
protected void possiblyResetDataSourceMetadata(
TaskToolbox toolbox,
- RecordSupplier recordSupplier,
+ RecordSupplier recordSupplier,
Set> assignment
)
{
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index 1c01ceb5dc75..e90b464b45bb 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -48,6 +48,7 @@
import com.google.common.collect.Queues;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.aws.AWSCredentialsUtils;
+import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@@ -92,7 +93,7 @@
* This class implements a local buffer for storing fetched Kinesis records. Fetching is done
* in background threads.
*/
-public class KinesisRecordSupplier implements RecordSupplier
+public class KinesisRecordSupplier implements RecordSupplier
{
private static final EmittingLogger log = new EmittingLogger(KinesisRecordSupplier.class);
private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000;
@@ -114,24 +115,6 @@ private static boolean isServiceExceptionRecoverable(AmazonServiceException ex)
return isIOException || isTimeout || isInternalError;
}
- /**
- * Returns an array with the content between the position and limit of "buffer". This may be the buffer's backing
- * array itself. Does not modify position or limit of the buffer.
- */
- private static byte[] toByteArray(final ByteBuffer buffer)
- {
- if (buffer.hasArray()
- && buffer.arrayOffset() == 0
- && buffer.position() == 0
- && buffer.array().length == buffer.limit()) {
- return buffer.array();
- } else {
- final byte[] retVal = new byte[buffer.remaining()];
- buffer.duplicate().get(retVal);
- return retVal;
- }
- }
-
/**
* Catch any exception and wrap it in a {@link StreamException}
*/
@@ -234,7 +217,7 @@ private Runnable fetchRecords()
// used for retrying on InterruptedException
GetRecordsResult recordsResult = null;
- OrderedPartitionableRecord currRecord;
+ OrderedPartitionableRecord currRecord;
try {
@@ -267,7 +250,7 @@ private Runnable fetchRecords()
// list will come back empty if there are no records
for (Record kinesisRecord : recordsResult.getRecords()) {
- final List data;
+ final List data;
if (deaggregate) {
@@ -282,10 +265,10 @@ private Runnable fetchRecords()
);
for (Object userRecord : userRecords) {
- data.add(toByteArray((ByteBuffer) getDataHandle.invoke(userRecord)));
+ data.add(new ByteEntity((ByteBuffer) getDataHandle.invoke(userRecord)));
}
} else {
- data = Collections.singletonList(toByteArray(kinesisRecord.getData()));
+ data = Collections.singletonList(new ByteEntity(kinesisRecord.getData()));
}
currRecord = new OrderedPartitionableRecord<>(
@@ -296,14 +279,22 @@ private Runnable fetchRecords()
);
- log.trace(
- "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s",
- currRecord.getStream(),
- currRecord.getPartitionId(),
- currRecord.getSequenceNumber(),
- records.remainingCapacity(),
- currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())
- );
+ if (log.isTraceEnabled()) {
+ log.trace(
+ "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s",
+ currRecord.getStream(),
+ currRecord.getPartitionId(),
+ currRecord.getSequenceNumber(),
+ records.remainingCapacity(),
+ currRecord.getData()
+ .stream()
+ .map(b -> StringUtils.fromUtf8(
+ // duplicate buffer to avoid changing its position when logging
+ b.getBuffer().duplicate())
+ )
+ .collect(Collectors.toList())
+ );
+ }
// If the buffer was full and we weren't able to add the message, grab a new stream iterator starting
// from this message and back off for a bit to let the buffer drain before retrying.
@@ -427,7 +418,7 @@ private long getPartitionTimeLag()
private final ConcurrentMap, PartitionResource> partitionResources =
new ConcurrentHashMap<>();
- private BlockingQueue> records;
+ private BlockingQueue> records;
private final boolean backgroundFetchEnabled;
private volatile boolean closed = false;
@@ -636,14 +627,14 @@ public String getPosition(StreamPartition partition)
@Nonnull
@Override
- public List> poll(long timeout)
+ public List> poll(long timeout)
{
start();
try {
int expectedSize = Math.min(Math.max(records.size(), 1), maxRecordsPerPoll);
- List> polledRecords = new ArrayList<>(expectedSize);
+ List> polledRecords = new ArrayList<>(expectedSize);
Queues.drain(
records,
@@ -928,7 +919,7 @@ private void filterBufferAndResetBackgroundFetch(Set> pa
}
// filter records in buffer and only retain ones whose partition was not seeked
- BlockingQueue> newQ = new LinkedBlockingQueue<>(recordBufferSize);
+ BlockingQueue> newQ = new LinkedBlockingQueue<>(recordBufferSize);
records.stream()
.filter(x -> !partitions.contains(x.getStreamPartition()))
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index ba5129a3d93d..a7bc5997d655 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata;
@@ -72,7 +73,7 @@
* tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of
* Kinesis sequences.
*/
-public class KinesisSupervisor extends SeekableStreamSupervisor
+public class KinesisSupervisor extends SeekableStreamSupervisor
{
private static final EmittingLogger log = new EmittingLogger(KinesisSupervisor.class);
@@ -150,7 +151,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
}
@Override
- protected List> createIndexTasks(
+ protected List> createIndexTasks(
int replicas,
String baseSequenceName,
ObjectMapper sortingMapper,
@@ -164,7 +165,7 @@ protected List> createIndexTasks(
final Map context = createBaseTaskContexts();
context.put(CHECKPOINTS_CTX_KEY, checkpoints);
- List> taskList = new ArrayList<>();
+ List> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName);
taskList.add(new KinesisIndexTask(
@@ -182,7 +183,7 @@ protected List> createIndexTasks(
@Override
- protected RecordSupplier setupRecordSupplier() throws RuntimeException
+ protected RecordSupplier setupRecordSupplier() throws RuntimeException
{
KinesisSupervisorIOConfig ioConfig = spec.getIoConfig();
KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig();
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 2c32ece55e77..37d699a39eb4 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -40,6 +40,7 @@
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.common.aws.AWSCredentialsConfig;
+import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
@@ -153,7 +154,6 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
private static final String SHARD_ID1 = "1";
private static final String SHARD_ID0 = "0";
private static KinesisRecordSupplier recordSupplier;
- private static List> records;
private static ServiceEmitter emitter;
@@ -216,7 +216,6 @@ public void setupTest() throws IOException, InterruptedException
maxParseExceptions = null;
maxSavedParseExceptions = null;
doHandoff = true;
- records = generateRecords(STREAM);
reportsFile = File.createTempFile("KinesisIndexTaskTestReports-" + System.currentTimeMillis(), "json");
maxRecordsPerPoll = 1;
@@ -249,7 +248,24 @@ public static void tearDownClass() throws Exception
emitter.close();
}
- private static List> generateRecords(String stream)
+ // records can only be read once, hence we generate fresh records every time
+ private static List> generateRecords(int start)
+ {
+ final List> records = generateRecords(STREAM);
+ return records.subList(start, records.size());
+ }
+
+ private static List> generateRecords(int start, int end)
+ {
+ return generateRecords(STREAM).subList(start, end);
+ }
+
+ private List> generateSinglePartitionRecords(int start, int end)
+ {
+ return generateSinglePartitionRecords(STREAM).subList(start, end);
+ }
+
+ private static List> generateRecords(String stream)
{
return ImmutableList.of(
new OrderedPartitionableRecord<>(stream, "1", "0", jbl("2008", "a", "y", "10", "20.0", "1.0")),
@@ -267,15 +283,15 @@ private static List> generateRecords(
stream,
"1",
"6",
- Collections.singletonList(StringUtils.toUtf8("unparseable"))
+ Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable")))
),
new OrderedPartitionableRecord<>(
stream,
"1",
"7",
- Collections.singletonList(StringUtils.toUtf8("unparseable2"))
+ Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable2")))
),
- new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))),
+ new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}")))),
new OrderedPartitionableRecord<>(stream, "1", "9", jbl("2013", "f", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "10", jbl("2049", "f", "y", "notanumber", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "11", jbl("2049", "f", "y", "10", "notanumber", "1.0")),
@@ -285,7 +301,7 @@ private static List> generateRecords(
);
}
- private static List> generateSinglePartitionRecords(String stream)
+ private static List> generateSinglePartitionRecords(String stream)
{
return ImmutableList.of(
new OrderedPartitionableRecord<>(stream, "1", "0", jbl("2008", "a", "y", "10", "20.0", "1.0")),
@@ -317,7 +333,7 @@ public void testRunAfterDataInserted() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 5)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 5)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -384,7 +400,7 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 5)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 5)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -459,7 +475,7 @@ public void testRunBeforeDataInserted() throws Exception
EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(Collections.emptyList())
.times(5)
- .andReturn(records.subList(13, 15))
+ .andReturn(generateRecords(13, 15))
.once();
recordSupplier.close();
@@ -531,9 +547,9 @@ public void testIncrementalHandOff() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(0, 5))
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(0, 5))
.once()
- .andReturn(records.subList(4, records.size()))
+ .andReturn(generateRecords(4))
.once();
recordSupplier.close();
@@ -643,11 +659,11 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(0, 3))
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(0, 3))
.once()
- .andReturn(records.subList(2, 10))
+ .andReturn(generateRecords(2, 10))
.once()
- .andReturn(records.subList(9, 11));
+ .andReturn(generateRecords(9, 11));
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -774,7 +790,7 @@ public void testRunWithMinimumMessageTime() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(0, 13)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(0, 13)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -844,7 +860,7 @@ public void testRunWithMaximumMessageTime() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(0, 13)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(0, 13)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -916,7 +932,7 @@ public void testRunWithTransformSpec() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(0, 13)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(0, 13)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -994,7 +1010,7 @@ public void testRunOnSingletonRange() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 3)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 3)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -1053,7 +1069,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -1123,7 +1139,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -1196,7 +1212,7 @@ public void testReportParseExceptions() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -1256,7 +1272,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -1356,7 +1372,7 @@ public void testMultipleParseExceptionsFailure() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -1439,7 +1455,9 @@ public void testRunReplicas() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13)).times(2);
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong()))
+ .andReturn(generateRecords(2, 13)).once()
+ .andReturn(generateRecords(2, 13)).once();
recordSupplier.close();
EasyMock.expectLastCall().times(2);
@@ -1530,9 +1548,9 @@ public void testRunConflicting() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13))
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13))
.once()
- .andReturn(records.subList(3, 13))
+ .andReturn(generateRecords(3, 13))
.once();
recordSupplier.close();
@@ -1622,9 +1640,9 @@ public void testRunConflictingWithoutTransactions() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13))
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13))
.once()
- .andReturn(records.subList(3, 13))
+ .andReturn(generateRecords(3, 13))
.once();
recordSupplier.close();
@@ -1715,7 +1733,7 @@ public void testRunOneTaskTwoPartitions() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, records.size())).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2)).once();
recordSupplier.close();
EasyMock.expectLastCall().once();
@@ -1791,9 +1809,9 @@ public void testRunTwoTasksTwoPartitions() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13))
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13))
.once()
- .andReturn(records.subList(13, 15))
+ .andReturn(generateRecords(13, 15))
.once();
recordSupplier.close();
@@ -1884,7 +1902,7 @@ public void testRestore() throws Exception
EasyMock.expectLastCall();
recordSupplier.seek(streamPartition, "2");
EasyMock.expectLastCall();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 4))
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 4))
.once()
.andReturn(Collections.emptyList())
.anyTimes();
@@ -1935,7 +1953,7 @@ public void testRestore() throws Exception
EasyMock.expectLastCall();
recordSupplier.seek(streamPartition, "3");
EasyMock.expectLastCall();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(3, 6)).once();
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(3, 6)).once();
recordSupplier.assign(ImmutableSet.of());
EasyMock.expectLastCall();
recordSupplier.close();
@@ -2005,7 +2023,7 @@ public void testRestoreAfterPersistingSequences() throws Exception
{
maxRowsPerSegment = 2;
maxRecordsPerPoll = 1;
- records = generateSinglePartitionRecords(STREAM);
+ List> records = generateSinglePartitionRecords(STREAM);
recordSupplier.assign(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
@@ -2162,7 +2180,7 @@ public void testRunWithPauseAndResume() throws Exception
EasyMock.expectLastCall();
recordSupplier.seek(streamPartition, "2");
EasyMock.expectLastCall();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 5))
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 5))
.once()
.andReturn(Collections.emptyList())
.anyTimes();
@@ -2273,7 +2291,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(records.subList(2, 13))
+ EasyMock.expect(recordSupplier.poll(EasyMock.anyLong())).andReturn(generateRecords(2, 13))
.once();
recordSupplier.close();
@@ -2340,8 +2358,6 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
@Test(timeout = 5000L)
public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
{
- records = generateSinglePartitionRecords(STREAM);
-
final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen
maxRowsPerSegment = 2;
@@ -2352,9 +2368,9 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
EasyMock.expect(recordSupplier1.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
recordSupplier1.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier1.poll(EasyMock.anyLong())).andReturn(records.subList(0, 5))
+ EasyMock.expect(recordSupplier1.poll(EasyMock.anyLong())).andReturn(generateSinglePartitionRecords(0, 5))
.once()
- .andReturn(records.subList(4, 10))
+ .andReturn(generateSinglePartitionRecords(4, 10))
.once();
recordSupplier1.close();
EasyMock.expectLastCall().once();
@@ -2364,9 +2380,9 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
EasyMock.expect(recordSupplier2.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
recordSupplier2.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- EasyMock.expect(recordSupplier2.poll(EasyMock.anyLong())).andReturn(records.subList(0, 5))
+ EasyMock.expect(recordSupplier2.poll(EasyMock.anyLong())).andReturn(generateSinglePartitionRecords(0, 5))
.once()
- .andReturn(records.subList(4, 10))
+ .andReturn(generateSinglePartitionRecords(4, 10))
.once();
recordSupplier2.close();
EasyMock.expectLastCall().once();
@@ -2579,12 +2595,12 @@ public void testEndOfShard() throws Exception
recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
- List> eosRecord = ImmutableList.of(
+ List> eosRecord = ImmutableList.of(
new OrderedPartitionableRecord<>(STREAM, SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER, null)
);
EasyMock.expect(recordSupplier.poll(EasyMock.anyLong()))
- .andReturn(records.subList(2, 5)).once()
+ .andReturn(generateRecords(2, 5)).once()
.andReturn(eosRecord).once();
recordSupplier.close();
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index 3cdd49176b7d..94cdec5670ba 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -34,6 +34,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.ISE;
@@ -95,7 +96,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
x.getSequenceNumber(),
Collections
.singletonList(
- toByteArray(
+ new ByteEntity(
x.getData()))
))
.collect(
@@ -108,7 +109,7 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
x.getSequenceNumber(),
Collections
.singletonList(
- toByteArray(
+ new ByteEntity(
x.getData()))
))
.collect(
@@ -238,7 +239,7 @@ private static GetRecordsRequest generateGetRecordsReq(String shardIterator, int
}
// filter out EOS markers
- private static List> cleanRecords(List> records)
+ private static List> cleanRecords(List> records)
{
return records.stream()
.filter(x -> !x.getSequenceNumber()
@@ -312,7 +313,7 @@ public void testPoll() throws InterruptedException
Thread.sleep(100);
}
- List> polledRecords = cleanRecords(recordSupplier.poll(
+ List> polledRecords = cleanRecords(recordSupplier.poll(
POLL_TIMEOUT_MILLIS));
verifyAll();
@@ -400,7 +401,7 @@ public void testPollWithKinesisInternalFailure() throws InterruptedException
Thread.sleep(100);
}
- List> polledRecords = cleanRecords(recordSupplier.poll(
+ List> polledRecords = cleanRecords(recordSupplier.poll(
POLL_TIMEOUT_MILLIS));
verifyAll();
@@ -479,7 +480,7 @@ public void testSeek()
Thread.sleep(100);
}
- List> polledRecords = cleanRecords(recordSupplier.poll(
+ List> polledRecords = cleanRecords(recordSupplier.poll(
POLL_TIMEOUT_MILLIS));
verifyAll();
@@ -644,7 +645,7 @@ public void testPollAfterSeek()
Thread.sleep(100);
}
- OrderedPartitionableRecord firstRecord = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0);
+ OrderedPartitionableRecord firstRecord = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0);
Assert.assertEquals(
ALL_RECORDS.get(7),
@@ -662,7 +663,7 @@ public void testPollAfterSeek()
}
- OrderedPartitionableRecord record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0);
+ OrderedPartitionableRecord record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0);
Assert.assertEquals(ALL_RECORDS.get(9), record2);
// only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS
@@ -737,7 +738,7 @@ public void testPollDeaggregate() throws InterruptedException
Thread.sleep(100);
}
- List> polledRecords = cleanRecords(recordSupplier.poll(
+ List> polledRecords = cleanRecords(recordSupplier.poll(
POLL_TIMEOUT_MILLIS));
verifyAll();
@@ -886,23 +887,4 @@ public void getPartitionTimeLag() throws InterruptedException
}
verifyAll();
}
-
- /**
- * Returns an array with the content between the position and limit of "buffer". This may be the buffer's backing
- * array itself. Does not modify position or limit of the buffer.
- */
- private static byte[] toByteArray(final ByteBuffer buffer)
- {
- if (buffer.hasArray()
- && buffer.arrayOffset() == 0
- && buffer.position() == 0
- && buffer.array().length == buffer.limit()) {
- return buffer.array();
- } else {
- final byte[] retVal = new byte[buffer.remaining()];
- buffer.duplicate().get(retVal);
- return retVal;
- }
- }
-
}
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
index dadc6c7a944d..9f469bfe1661 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
@@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.JsonInputFormat;
@@ -89,7 +90,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
private final KinesisRecordSupplier recordSupplier = mock(KinesisRecordSupplier.class);
- private static List> generateRecords(String stream)
+ private static List> generateRecords(String stream)
{
return ImmutableList.of(
new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
@@ -105,9 +106,9 @@ private static List> generateRecords(
stream,
"1",
"6",
- Collections.singletonList(StringUtils.toUtf8("unparseable"))
+ Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable")))
),
- new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}")))
+ new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}"))))
);
}
@@ -274,10 +275,10 @@ public void testSample() throws Exception
Assert.assertFalse(it.hasNext());
}
- private static List jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1)
+ private static List jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1)
{
try {
- return Collections.singletonList(new ObjectMapper().writeValueAsBytes(
+ return Collections.singletonList(new ByteEntity(new ObjectMapper().writeValueAsBytes(
ImmutableMap.builder()
.put("timestamp", ts)
.put("dim1", dim1)
@@ -286,7 +287,7 @@ private static List jb(String ts, String dim1, String dim2, String dimLo
.put("dimFloat", dimFloat)
.put("met1", met1)
.build()
- ));
+ )));
}
catch (Exception e) {
throw new RuntimeException(e);
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 3d57a6bbe1b2..77c30d4fa383 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -27,6 +27,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
@@ -5268,7 +5269,7 @@ protected String generateSequenceName(
}
@Override
- protected RecordSupplier setupRecordSupplier()
+ protected RecordSupplier setupRecordSupplier()
{
return supervisorRecordSupplier;
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
index d523f2d806df..c38757150780 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java
@@ -43,15 +43,15 @@
* {@link org.apache.druid.data.input.InputSource} wrapping {@link RecordSupplier}. It will fetch data via
* RecordSupplier and convert it into {@link ByteEntity}. See {@link #createEntityIterator}.
*/
-public class RecordSupplierInputSource extends AbstractInputSource
+public class RecordSupplierInputSource extends AbstractInputSource
{
private final String topic;
- private final RecordSupplier recordSupplier;
+ private final RecordSupplier recordSupplier;
private final boolean useEarliestOffset;
public RecordSupplierInputSource(
String topic,
- RecordSupplier recordSupplier,
+ RecordSupplier recordSupplier,
boolean useEarliestOffset
)
{
@@ -66,7 +66,7 @@ public RecordSupplierInputSource(
}
}
- private void assignAndSeek(RecordSupplier recordSupplier)
+ private void assignAndSeek(RecordSupplier recordSupplier)
throws InterruptedException
{
final Set> partitions = recordSupplier
@@ -113,15 +113,15 @@ protected InputSourceReader formattableReader(
}
/**
- * Returns an iterator converting each byte array from RecordSupplier into a ByteEntity. Note that the
+ * Returns an iterator converting a RecordSupplier into an iterator of ByteEntity. Note that the
* returned iterator will be blocked until the RecordSupplier gives any data.
*/
CloseableIterator createEntityIterator()
{
return new CloseableIterator()
{
- private Iterator> recordIterator;
- private Iterator bytesIterator;
+ private Iterator> recordIterator;
+ private Iterator extends ByteEntity> bytesIterator;
private volatile boolean closed;
private void waitNextIteratorIfNecessary()
@@ -146,7 +146,7 @@ public boolean hasNext()
@Override
public InputEntity next()
{
- return new ByteEntity(bytesIterator.next());
+ return bytesIterator.next();
}
@Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 54292d8c9fb8..d3436ea3ef34 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -25,6 +25,7 @@
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
@@ -57,7 +58,7 @@
import java.util.Map;
-public abstract class SeekableStreamIndexTask
+public abstract class SeekableStreamIndexTask
extends AbstractTask implements ChatHandler
{
public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
@@ -72,7 +73,7 @@ public abstract class SeekableStreamIndexTask> runnerSupplier;
+ private final Supplier> runnerSupplier;
@MonotonicNonNull
protected AuthorizerMapper authorizerMapper;
@@ -259,9 +260,9 @@ public boolean withinMinMaxRecordTime(final InputRow row)
return !beforeMinimumMessageTime && !afterMaximumMessageTime;
}
- protected abstract SeekableStreamIndexTaskRunner createTaskRunner();
+ protected abstract SeekableStreamIndexTaskRunner createTaskRunner();
- protected abstract RecordSupplier newTaskRecordSupplier();
+ protected abstract RecordSupplier newTaskRecordSupplier();
@VisibleForTesting
public Appenderator getAppenderator()
@@ -270,7 +271,7 @@ public Appenderator getAppenderator()
}
@VisibleForTesting
- public SeekableStreamIndexTaskRunner getRunner()
+ public SeekableStreamIndexTaskRunner getRunner()
{
return runnerSupplier.get();
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 2a2bf2d7f05f..5ca48523648a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -40,6 +40,7 @@
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
@@ -134,7 +135,7 @@
* @param Partition Number Type
* @param Sequence Number Type
*/
-public abstract class SeekableStreamIndexTaskRunner implements ChatHandler
+public abstract class SeekableStreamIndexTaskRunner implements ChatHandler
{
public enum Status
{
@@ -193,7 +194,7 @@ public enum Status
protected final Lock pollRetryLock = new ReentrantLock();
protected final Condition isAwaitingRetry = pollRetryLock.newCondition();
- private final SeekableStreamIndexTask task;
+ private final SeekableStreamIndexTask task;
private final SeekableStreamIndexTaskIOConfig ioConfig;
private final SeekableStreamIndexTaskTuningConfig tuningConfig;
private final InputRowSchema inputRowSchema;
@@ -232,7 +233,7 @@ public enum Status
private volatile Throwable backgroundThreadException;
public SeekableStreamIndexTaskRunner(
- final SeekableStreamIndexTask task,
+ final SeekableStreamIndexTask task,
@Nullable final InputRowParser parser,
final AuthorizerMapper authorizerMapper,
final LockGranularity lockGranularityToUse
@@ -370,7 +371,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
);
// Now we can initialize StreamChunkReader with the given toolbox.
- final StreamChunkParser parser = new StreamChunkParser(
+ final StreamChunkParser parser = new StreamChunkParser(
this.parser,
inputFormat,
inputRowSchema,
@@ -412,7 +413,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
);
Throwable caughtExceptionOuter = null;
- try (final RecordSupplier recordSupplier = task.newTaskRecordSupplier()) {
+ try (final RecordSupplier recordSupplier = task.newTaskRecordSupplier()) {
if (toolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
toolbox.getDataSegmentServerAnnouncer().announce();
@@ -600,7 +601,7 @@ public void run()
// calling getRecord() ensures that exceptions specific to kafka/kinesis like OffsetOutOfRangeException
// are handled in the subclasses.
- List> records = getRecords(
+ List> records = getRecords(
recordSupplier,
toolbox
);
@@ -609,7 +610,7 @@ public void run()
stillReading = !assignment.isEmpty();
SequenceMetadata sequenceToCheckpoint = null;
- for (OrderedPartitionableRecord record : records) {
+ for (OrderedPartitionableRecord record : records) {
final boolean shouldProcess = verifyRecordInRange(record.getPartitionId(), record.getSequenceNumber());
log.trace(
@@ -621,8 +622,7 @@ public void run()
);
if (shouldProcess) {
- final List valueBytess = record.getData();
- final List rows = parser.parse(valueBytess);
+ final List rows = parser.parse(record.getData());
boolean isPersistRequired = false;
final SequenceMetadata sequenceToUse = sequences
@@ -1112,7 +1112,7 @@ private void maybePersistAndPublishSequences(Supplier committerSuppli
}
private Set> assignPartitions(
- RecordSupplier recordSupplier
+ RecordSupplier recordSupplier
)
{
final Set> assignment = new HashSet<>();
@@ -1230,7 +1230,7 @@ private boolean isMoreToReadAfterReadingRecord(
}
private void seekToStartingSequence(
- RecordSupplier recordSupplier,
+ RecordSupplier recordSupplier,
Set> partitions
) throws InterruptedException
{
@@ -1914,8 +1914,8 @@ protected abstract SeekableStreamEndSequenceNumbers> getRecords(
- RecordSupplier recordSupplier,
+ protected abstract List> getRecords(
+ RecordSupplier recordSupplier,
TaskToolbox toolbox
) throws Exception;
@@ -1945,7 +1945,7 @@ protected abstract SeekableStreamDataSourceMetadata recordSupplier,
+ RecordSupplier recordSupplier,
Set> assignment
);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
index 5c0447704b9b..9175f9510770 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -54,7 +54,7 @@
import java.util.Map;
import java.util.stream.Stream;
-public abstract class SeekableStreamSamplerSpec implements SamplerSpec
+public abstract class SeekableStreamSamplerSpec implements SamplerSpec
{
static final long POLL_TIMEOUT_MS = 100;
@@ -106,7 +106,7 @@ public SamplerResponse sample()
return inputSourceSampler.sample(inputSource, inputFormat, dataSchema, samplerConfig);
}
- protected abstract RecordSupplier createRecordSupplier();
+ protected abstract RecordSupplier createRecordSupplier();
private class SeekableStreamSamplerFirehoseFactory implements FiniteFirehoseFactory
{
@@ -159,7 +159,7 @@ protected SeekableStreamSamplerFirehose(InputRowParser parser)
((StringInputRowParser) parser).startFileFromBeginning();
}
- RecordSupplierInputSource inputSource = new RecordSupplierInputSource<>(
+ RecordSupplierInputSource inputSource = new RecordSupplierInputSource<>(
ioConfig.getStream(),
createRecordSupplier(),
ioConfig.isUseEarliestSequenceNumber()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index af926203d6c2..b7cff3130ba0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -183,8 +183,8 @@ boolean isOpen()
}
boolean canHandle(
- SeekableStreamIndexTaskRunner runner,
- OrderedPartitionableRecord record
+ SeekableStreamIndexTaskRunner runner,
+ OrderedPartitionableRecord record
)
{
lock.lock();
@@ -240,7 +240,7 @@ public String toString()
}
Supplier getCommitterSupplier(
- SeekableStreamIndexTaskRunner runner,
+ SeekableStreamIndexTaskRunner runner,
String stream,
Map lastPersistedOffsets
)
@@ -304,7 +304,7 @@ public void run()
}
TransactionalSegmentPublisher createPublisher(
- SeekableStreamIndexTaskRunner runner,
+ SeekableStreamIndexTaskRunner runner,
TaskToolbox toolbox,
boolean useTransaction
)
@@ -319,12 +319,12 @@ TransactionalSegmentPublisher createPublisher(
private class SequenceMetadataTransactionalSegmentPublisher
implements TransactionalSegmentPublisher
{
- private final SeekableStreamIndexTaskRunner runner;
+ private final SeekableStreamIndexTaskRunner runner;
private final TaskToolbox toolbox;
private final boolean useTransaction;
public SequenceMetadataTransactionalSegmentPublisher(
- SeekableStreamIndexTaskRunner runner,
+ SeekableStreamIndexTaskRunner runner,
TaskToolbox toolbox,
boolean useTransaction
)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
index c4089c9d9cf3..3926ae99e773 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamChunkParser.java
@@ -46,7 +46,7 @@
* Abstraction for parsing stream data which internally uses {@link org.apache.druid.data.input.InputEntityReader}
* or {@link InputRowParser}. This class will be useful untill we remove the deprecated InputRowParser.
*/
-class StreamChunkParser
+class StreamChunkParser
{
@Nullable
private final InputRowParser