diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
index 41c2c0a03258..53369cc6ad60 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
@@ -33,8 +33,6 @@
* key, and timestamp.
*
* NOTE: Any records with null values will be skipped, even if they contain non-null keys, or headers
- *
- * This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions.
*/
public class KafkaRecordEntity extends ByteEntity
{
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java
new file mode 100644
index 000000000000..7d97fffed378
--- /dev/null
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kinesis;
+
+import com.amazonaws.services.kinesis.model.Record;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.java.util.common.DateTimes;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.Objects;
+
+/**
+ * Kinesis aware InputFormat. Allows for reading kinesis specific values that are stored in the {@link Record}. At
+ * this time, this input format only supports reading data from the following record components
+ *
+ * - {@link Record#data}
+ * - {@link Record#approximateArrivalTimestamp}
+ * - {@link Record#partitionKey}
+ *
+ * This class can be extended easily to read other fields available in the kinesis record.
+ */
+public class KinesisInputFormat implements InputFormat
+{
+ private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kinesis.timestamp";
+ private static final String DEFAULT_PARTITION_KEY_COLUMN_NAME = "kinesis.partitionKey";
+
+ // Since KinesisInputFormat blends data from record properties, and payload, timestamp spec can be pointing to an
+ // attribute within one of these 2 sections. To handle scenarios where there is no timestamp value in the payload, we
+ // induce an artificial timestamp value to avoid unnecessary parser barf out. Users in such situations can use the
+ // inputFormat's kinesis record timestamp as its primary timestamp.
+ public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp";
+ private final TimestampSpec dummyTimestampSpec = new TimestampSpec(DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH);
+
+ private final InputFormat valueFormat;
+ private final String timestampColumnName;
+ private final String partitionKeyColumnName;
+
+ public KinesisInputFormat(
+ @JsonProperty("valueFormat") InputFormat valueFormat,
+ @JsonProperty("partitionKeyColumnName") @Nullable String partitionKeyColumnName,
+ @JsonProperty("timestampColumnName") @Nullable String timestampColumnName
+ )
+ {
+ this.valueFormat = Preconditions.checkNotNull(valueFormat, "valueFormat must not be null");
+ Preconditions.checkState(
+ !(timestampColumnName != null && timestampColumnName.equals(partitionKeyColumnName)),
+ "timestampColumnName and partitionKeyColumnName must be different"
+ );
+ this.partitionKeyColumnName = partitionKeyColumnName != null
+ ? partitionKeyColumnName
+ : DEFAULT_PARTITION_KEY_COLUMN_NAME;
+ this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME;
+ }
+
+ @Override
+ public boolean isSplittable()
+ {
+ return false;
+ }
+
+ @Override
+ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
+ {
+ final SettableByteEntity settableByteEntitySource;
+ if (source instanceof SettableByteEntity) {
+ settableByteEntitySource = (SettableByteEntity) source;
+ } else {
+ settableByteEntitySource = new SettableByteEntity<>();
+ settableByteEntitySource.setEntity((KinesisRecordEntity) source);
+ }
+ InputRowSchema newInputRowSchema = new InputRowSchema(
+ dummyTimestampSpec,
+ inputRowSchema.getDimensionsSpec(),
+ inputRowSchema.getColumnsFilter(),
+ inputRowSchema.getMetricNames()
+ );
+ return new KinesisInputReader(
+ inputRowSchema,
+ settableByteEntitySource,
+ JsonInputFormat.withLineSplittable(valueFormat, false).createReader(
+ newInputRowSchema,
+ source,
+ temporaryDirectory
+ ),
+ partitionKeyColumnName,
+ timestampColumnName
+ );
+ }
+
+ @JsonProperty
+ public InputFormat getValueFormat()
+ {
+ return valueFormat;
+ }
+
+ @Nullable
+ @JsonProperty
+ public String getTimestampColumnName()
+ {
+ return timestampColumnName;
+ }
+
+ @Nullable
+ @JsonProperty
+ public String getPartitionKeyColumnName()
+ {
+ return partitionKeyColumnName;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KinesisInputFormat that = (KinesisInputFormat) o;
+ return Objects.equals(valueFormat, that.valueFormat)
+ && Objects.equals(timestampColumnName, that.timestampColumnName)
+ && Objects.equals(partitionKeyColumnName, that.partitionKeyColumnName);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(valueFormat, timestampColumnName, partitionKeyColumnName);
+ }
+}
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java
new file mode 100644
index 000000000000..d0c30280a2b7
--- /dev/null
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kinesis;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.indexing.seekablestream.SettableByteEntity;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.joda.time.DateTime;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class KinesisInputReader implements InputEntityReader
+{
+
+ private final InputRowSchema inputRowSchema;
+ private final SettableByteEntity source;
+ private final InputEntityReader valueParser;
+ private final String partitionKeyColumnName;
+ private final String timestampColumnName;
+
+ public KinesisInputReader(
+ InputRowSchema inputRowSchema,
+ SettableByteEntity source,
+ InputEntityReader valueParser,
+ String partitionKeyColumnName,
+ String timestampColumnName
+ )
+ {
+ this.inputRowSchema = inputRowSchema;
+ this.source = source;
+ this.valueParser = valueParser;
+ this.partitionKeyColumnName = partitionKeyColumnName;
+ this.timestampColumnName = timestampColumnName;
+
+ }
+
+ @Override
+ public CloseableIterator read() throws IOException
+ {
+ final KinesisRecordEntity record = source.getEntity();
+ final Map mergedHeaderMap = extractHeaders(record);
+
+ if (record.getRecord().getData() != null) {
+ return buildBlendedRows(valueParser, mergedHeaderMap);
+ } else {
+ return CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator());
+ }
+ }
+
+ @Override
+ public CloseableIterator sample() throws IOException
+ {
+ final KinesisRecordEntity record = source.getEntity();
+ InputRowListPlusRawValues headers = extractHeaderSample(record);
+ if (record.getRecord().getData() != null) {
+ return buildBlendedRowsSample(valueParser, headers.getRawValues());
+ } else {
+ final List rows = Collections.singletonList(headers);
+ return CloseableIterators.withEmptyBaggage(rows.iterator());
+ }
+ }
+
+ private Map extractHeaders(KinesisRecordEntity record)
+ {
+ final Map mergedHeaderMap = new HashMap<>();
+ mergedHeaderMap.put(timestampColumnName, record.getRecord().getApproximateArrivalTimestamp().getTime());
+ mergedHeaderMap.put(partitionKeyColumnName, record.getRecord().getPartitionKey());
+ return mergedHeaderMap;
+ }
+
+ private CloseableIterator buildBlendedRows(
+ InputEntityReader valueParser,
+ Map headerKeyList
+ ) throws IOException
+ {
+ return valueParser.read().map(
+ r -> {
+ final HashSet newDimensions = new HashSet<>(r.getDimensions());
+ final Map event = buildBlendedEventMap(r::getRaw, newDimensions, headerKeyList);
+ newDimensions.addAll(headerKeyList.keySet());
+ // Remove the dummy timestamp added in KinesisInputFormat
+ newDimensions.remove(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
+
+ final DateTime timestamp = MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event);
+ return new MapBasedInputRow(
+ timestamp,
+ MapInputRowParser.findDimensions(
+ inputRowSchema.getTimestampSpec(),
+ inputRowSchema.getDimensionsSpec(),
+ newDimensions
+ ),
+ event
+ );
+ }
+ );
+ }
+
+ private InputRowListPlusRawValues extractHeaderSample(KinesisRecordEntity record)
+ {
+ Map mergedHeaderMap = extractHeaders(record);
+ return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap);
+ }
+
+ private CloseableIterator buildBlendedRowsSample(
+ InputEntityReader valueParser,
+ Map headerKeyList
+ ) throws IOException
+ {
+ return valueParser.sample().map(
+ rowAndValues -> {
+ if (rowAndValues.getParseException() != null) {
+ return rowAndValues;
+ }
+ List newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size());
+ List