Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ interface CleanableFile extends Closeable
* Opens an {@link InputStream} on the input entity directly.
* This is the basic way to read the given entity.
*
* The behavior of this method is only defined fort the first call to open().
* The behavior of subsequent calls is undefined and may vary between implementations.
*
* @see #fetch
*/
InputStream open() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.kafka;

import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* A {@link ByteEntity} generated by {@link KafkaRecordSupplier} and fed to any {@link InputFormat} used by Kafka
* indexing tasks.
* <p>
* It can be used as a regular ByteEntity, in which case the Kafka record value is returned, but the {@link #getRecord}
* method also allows Kafka-aware {@link InputFormat} implementations to read the full Kafka record, including headers,
* key, and timestamp.
* <p>
* NOTE: Any records with null values will be skipped, even if they contain non-null keys, or headers
* <p>
* This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions.
*/
public class KafkaRecordEntity extends ByteEntity
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add javadocs explaining how this class is meant to be used? Something like:

A ByteEntity generated by {@link KafkaRecordSupplier} and fed to any {@link InputFormat} used by a Kafka indexing tasks. It can be used as a regular ByteEntity, in which case the Kafka message value is returned. But the {@link #getRecord} method also allows Kafka-aware InputFormat implementations to read the full Kafka message, including headers, key, and timestamp.

This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions.

{
private final ConsumerRecord<byte[], byte[]> record;

public KafkaRecordEntity(ConsumerRecord<byte[], byte[]> record)
{
super(record.value());
this.record = record;
}

public ConsumerRecord<byte[], byte[]> getRecord()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used anywhere , is this only to allow writing extensions with custom InputFormat which can take advantage of the extra metadata available ?

Copy link
Copy Markdown
Member Author

@xvrl xvrl Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's correct. We could have a sample InputFormat for documentation purposes, though I'm not sure if there is much value. A more generic input format for Kafka that wraps multiple input-formats would require a lot more thought, and I couldn't think of a one-size-fits-all approach that seemed very useful or didn't have additional complexities to deal with.

{
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
Expand Down Expand Up @@ -56,7 +57,7 @@
/**
* Kafka indexing task runner supporting incremental segments publishing
*/
public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<Integer, Long>
public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<Integer, Long, KafkaRecordEntity>
{
private static final EmittingLogger log = new EmittingLogger(IncrementalPublishingKafkaIndexTaskRunner.class);
private final KafkaIndexTask task;
Expand Down Expand Up @@ -85,15 +86,15 @@ protected Long getNextStartOffset(@NotNull Long sequenceNumber)

@Nonnull
@Override
protected List<OrderedPartitionableRecord<Integer, Long>> getRecords(
RecordSupplier<Integer, Long> recordSupplier,
protected List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> getRecords(
RecordSupplier<Integer, Long, KafkaRecordEntity> recordSupplier,
TaskToolbox toolbox
) throws Exception
{
// Handles OffsetOutOfRangeException, which is thrown if the seeked-to
// offset is not present in the topic-partition. This can happen if we're asking a task to read from data
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
List<OrderedPartitionableRecord<Integer, Long>> records = new ArrayList<>();
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> records = new ArrayList<>();
try {
records = recordSupplier.poll(task.getIOConfig().getPollTimeout());
}
Expand Down Expand Up @@ -121,7 +122,7 @@ protected SeekableStreamEndSequenceNumbers<Integer, Long> deserializePartitionsF

private void possiblyResetOffsetsOrWait(
Map<TopicPartition, Long> outOfRangePartitions,
RecordSupplier<Integer, Long> recordSupplier,
RecordSupplier<Integer, Long, KafkaRecordEntity> recordSupplier,
TaskToolbox taskToolbox
) throws InterruptedException, IOException
{
Expand Down Expand Up @@ -192,7 +193,7 @@ protected OrderedSequenceNumber<Long> createSequenceNumber(Long sequenceNumber)
@Override
protected void possiblyResetDataSourceMetadata(
TaskToolbox toolbox,
RecordSupplier<Integer, Long> recordSupplier,
RecordSupplier<Integer, Long, KafkaRecordEntity> recordSupplier,
Set<StreamPartition<Integer>> assignment
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
Expand All @@ -39,7 +40,7 @@
import java.util.Set;
import java.util.stream.Collectors;

public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long, KafkaRecordEntity>
{
private static final String TYPE = "index_kafka";

Expand Down Expand Up @@ -122,7 +123,7 @@ static void assignPartitions(
}

@Override
protected SeekableStreamIndexTaskRunner<Integer, Long> createTaskRunner()
protected SeekableStreamIndexTaskRunner<Integer, Long, KafkaRecordEntity> createTaskRunner()
{
//noinspection unchecked
return new IncrementalPublishingKafkaIndexTaskRunner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
Expand Down Expand Up @@ -51,7 +52,7 @@
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
public class KafkaRecordSupplier implements RecordSupplier<Integer, Long, KafkaRecordEntity>
{
private final KafkaConsumer<byte[], byte[]> consumer;
private boolean closed;
Expand Down Expand Up @@ -119,15 +120,16 @@ public Set<StreamPartition<Integer>> getAssignment()

@Nonnull
@Override
public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout)
public List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> poll(long timeout)
{
List<OrderedPartitionableRecord<Integer, Long>> polledRecords = new ArrayList<>();
List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = new ArrayList<>();
for (ConsumerRecord<byte[], byte[]> record : consumer.poll(Duration.ofMillis(timeout))) {

polledRecords.add(new OrderedPartitionableRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.value() == null ? null : ImmutableList.of(record.value())
record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record))
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@himanshug I reverted this change to keep the existing behavior

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should put in a comment or javadoc noting that records with null values will be skipped, even if the other stuff (keys, timestamp, headers) are not null.

Here's a suggestion:

  1. Add a note to the javadoc of KafkaRecordEntity mentioning that these entities won't be generated for messages with null values
  2. Add a null-value check to the constructor of KafkaRecordEntity enforcing that comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, KafkaRecordEntity will already barf on null values when instantiating ByteEntity

));
}
return polledRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
Expand Down Expand Up @@ -74,7 +75,7 @@
* tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of
* Kafka offsets.
*/
public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, KafkaRecordEntity>
{
public static final TypeReference<TreeMap<Integer, Map<Integer, Long>>> CHECKPOINTS_TYPE_REF =
new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
Expand Down Expand Up @@ -121,7 +122,7 @@ public KafkaSupervisor(


@Override
protected RecordSupplier<Integer, Long> setupRecordSupplier()
protected RecordSupplier<Integer, Long, KafkaRecordEntity> setupRecordSupplier()
{
return new KafkaRecordSupplier(spec.getIoConfig().getConsumerProperties(), sortingMapper);
}
Expand Down Expand Up @@ -199,7 +200,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
}

@Override
protected List<SeekableStreamIndexTask<Integer, Long>> createIndexTasks(
protected List<SeekableStreamIndexTask<Integer, Long, KafkaRecordEntity>> createIndexTasks(
int replicas,
String baseSequenceName,
ObjectMapper sortingMapper,
Expand All @@ -217,7 +218,7 @@ protected List<SeekableStreamIndexTask<Integer, Long>> createIndexTasks(
// Kafka index task will pick up LegacyKafkaIndexTaskRunner without the below configuration.
context.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true);

List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
List<SeekableStreamIndexTask<Integer, Long, KafkaRecordEntity>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName);
taskList.add(new KafkaIndexTask(
Expand Down
Loading