From ccdbc746c1ff2690f94f0b6f15ed1a7b041a1a19 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 10 Jun 2024 16:29:40 -0400 Subject: [PATCH 01/17] SQL syntax error should target USER persona --- .../druid/sql/calcite/planner/DruidPlanner.java | 8 +------- .../druid/sql/calcite/planner/QueryHandler.java | 11 +---------- .../druid/sql/calcite/BaseCalciteQueryTest.java | 6 +----- 3 files changed, 3 insertions(+), 22 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index 4b697a0d5dfa..cf1d22eb39b4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -380,13 +380,7 @@ public static DruidException translateException(Exception e) } } - return DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.UNCATEGORIZED) - .build( - inner, - "Unable to parse the SQL, unrecognized error from calcite: [%s]", - inner.getMessage() - ); + return InvalidSqlInput.exception(inner.getMessage()); } catch (RelOptPlanner.CannotPlanException inner) { return DruidException.forPersona(DruidException.Persona.USER) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index 9f15d3822866..2d292621b57d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -684,16 +684,7 @@ private DruidException buildSQLPlanningError(RelOptPlanner.CannotPlanException e .ofCategory(DruidException.Category.UNSUPPORTED) .build(exception, "Unhandled Query Planning Failure, see broker logs for details"); } else { - // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong. - // For this reason, we consider these as targetting a more expert persona, i.e. the admin instead of the actual - // user. - throw DruidException.forPersona(DruidException.Persona.ADMIN) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build( - exception, - "Query could not be planned. A possible reason is [%s]", - errorMessage - ); + throw InvalidSqlInput.exception("Query could not be planned. A possible reason is [%s]", errorMessage); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index dfee7d0e3a22..9c34c89bd8fd 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -664,11 +664,7 @@ public void assertQueryIsUnplannable(final PlannerConfig plannerConfig, final St private DruidExceptionMatcher buildUnplannableExceptionMatcher() { - if (testBuilder().isDecoupledMode()) { - return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); - } else { - return new DruidExceptionMatcher(Persona.ADMIN, Category.INVALID_INPUT, "general"); - } + return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); } /** From abe109262edf128e67f6373b96dc62ede370e3ea Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 11 Jun 2024 14:34:12 -0400 Subject: [PATCH 02/17] * revert change to queryHandler and related tests, based on review comments --- .../druid/sql/calcite/planner/QueryHandler.java | 11 ++++++++++- .../druid/sql/calcite/BaseCalciteQueryTest.java | 6 +++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index 2d292621b57d..9f15d3822866 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -684,7 +684,16 @@ private DruidException buildSQLPlanningError(RelOptPlanner.CannotPlanException e .ofCategory(DruidException.Category.UNSUPPORTED) .build(exception, "Unhandled Query Planning Failure, see broker logs for details"); } else { - throw InvalidSqlInput.exception("Query could not be planned. A possible reason is [%s]", errorMessage); + // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong. + // For this reason, we consider these as targetting a more expert persona, i.e. the admin instead of the actual + // user. + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + exception, + "Query could not be planned. A possible reason is [%s]", + errorMessage + ); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 9c34c89bd8fd..dfee7d0e3a22 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -664,7 +664,11 @@ public void assertQueryIsUnplannable(final PlannerConfig plannerConfig, final St private DruidExceptionMatcher buildUnplannableExceptionMatcher() { - return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); + if (testBuilder().isDecoupledMode()) { + return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); + } else { + return new DruidExceptionMatcher(Persona.ADMIN, Category.INVALID_INPUT, "general"); + } } /** From 5d730475dca1c90cc2fd4abc15d890a1d812acb1 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 11 Jun 2024 15:58:16 -0400 Subject: [PATCH 03/17] * add test --- .../druid/sql/calcite/CalciteInsertDmlTest.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index bb9c03aa3c88..b40a4c87c3ab 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -1626,6 +1626,18 @@ public void testInsertWithInvalidSelectStatement() .verify(); } + @Test + public void testInsertWithLongIdentifer() + { + // This test fails because an identifer is specified of length 200, which exceeds the length limit of 128 + // characters. + String longIdentifer = new String(new char[200]).replace('\0', 'a'); + testIngestionQuery() + .sql(StringUtils.format("INSERT INTO t SELECT %s FROM foo PARTITIONED BY ALL", longIdentifer)) // count is a keyword + .expectValidationError(invalidSqlContains(StringUtils.format("Length of identifier '%s' must be less than or equal to 128 characters", longIdentifer))) + .verify(); + } + @Test public void testInsertWithUnnamedColumnInSelectStatement() { From d9fd4e46774576277397cff38445c331d1a3b337 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 29 Jul 2024 17:13:58 -0400 Subject: [PATCH 04/17] Introduce KinesisRecordEntity to support Kinesis headers in InputFormats --- .../input/kinesis/KinesisRecordEntity.java | 53 +++++++++++ .../indexing/kinesis/KinesisIndexTask.java | 6 +- .../kinesis/KinesisIndexTaskRunner.java | 10 +- .../kinesis/KinesisRecordSupplier.java | 28 +++--- .../kinesis/supervisor/KinesisSupervisor.java | 10 +- .../kinesis/KinesisIndexTaskTest.java | 94 +++++++++++-------- .../kinesis/KinesisRecordSupplierTest.java | 63 ++++++------- .../kinesis/KinesisSamplerSpecTest.java | 30 +++--- .../supervisor/KinesisSupervisorTest.java | 4 +- 9 files changed, 180 insertions(+), 118 deletions(-) create mode 100644 extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java new file mode 100644 index 000000000000..14fb92108a13 --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kinesis; + +import com.amazonaws.services.kinesis.model.Record; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.indexing.kinesis.KinesisRecordSupplier; + +/** + * A {@link ByteEntity} generated by {@link KinesisRecordSupplier} and fed to any {@link InputFormat} used by kinesis + * indexing tasks. + *

+ * It can be used as a regular ByteEntity, in which case the kinesis record value is returned, but the {@link #getRecord} + * method also allows Kinesis-aware {@link InputFormat} implementations to read the full kinesis record, including + * timestamp, encrytion key, patition key, and sequence number + *

+ * NOTE: Any records with null values will be skipped, even if they contain non-null keys, or headers + *

+ * This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions. + */ +public class KinesisRecordEntity extends ByteEntity +{ + private final Record record; + + public KinesisRecordEntity(Record record) + { + super(record.getData()); + this.record = record; + } + + public Record getRecord() + { + return record; + } +} diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index fb019f10030b..bea69d96c3d2 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -27,7 +27,7 @@ import com.google.common.base.Preconditions; import com.google.inject.name.Named; import org.apache.druid.common.aws.AWSCredentialsConfig; -import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.TaskResource; @@ -46,7 +46,7 @@ import java.util.Map; import java.util.Set; -public class KinesisIndexTask extends SeekableStreamIndexTask +public class KinesisIndexTask extends SeekableStreamIndexTask { private static final String TYPE = "index_kinesis"; @@ -100,7 +100,7 @@ public TaskStatus runTask(TaskToolbox toolbox) } @Override - protected SeekableStreamIndexTaskRunner createTaskRunner() + protected SeekableStreamIndexTaskRunner createTaskRunner() { //noinspection unchecked return new KinesisIndexTaskRunner( diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java index 75f23da0e1f6..72e61635912c 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java @@ -21,8 +21,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; @@ -49,7 +49,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentMap; -public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner +public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner { private static final EmittingLogger log = new EmittingLogger(KinesisIndexTaskRunner.class); private static final long POLL_TIMEOUT = 100; @@ -81,8 +81,8 @@ protected String getNextStartOffset(String sequenceNumber) @Nonnull @Override - protected List> getRecords( - RecordSupplier recordSupplier, TaskToolbox toolbox + protected List> getRecords( + RecordSupplier recordSupplier, TaskToolbox toolbox ) { return recordSupplier.poll(POLL_TIMEOUT); @@ -119,7 +119,7 @@ protected OrderedSequenceNumber createSequenceNumber(String sequenceNumb @Override protected void possiblyResetDataSourceMetadata( TaskToolbox toolbox, - RecordSupplier recordSupplier, + RecordSupplier recordSupplier, Set> assignment ) { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 36047ce429db..07a0da32a954 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -26,6 +26,7 @@ import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsRequest; @@ -49,7 +50,7 @@ import org.apache.druid.common.aws.AWSClientUtil; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.aws.AWSCredentialsUtils; -import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; @@ -69,7 +70,6 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -94,7 +94,7 @@ * This class implements a local buffer for storing fetched Kinesis records. Fetching is done * in background threads. */ -public class KinesisRecordSupplier implements RecordSupplier +public class KinesisRecordSupplier implements RecordSupplier { private static final EmittingLogger log = new EmittingLogger(KinesisRecordSupplier.class); private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000; @@ -210,7 +210,7 @@ private Runnable fetchRecords() // used for retrying on InterruptedException GetRecordsResult recordsResult = null; - OrderedPartitionableRecord currRecord; + OrderedPartitionableRecord currRecord; long recordBufferOfferWaitMillis; try { @@ -248,7 +248,7 @@ private Runnable fetchRecords() // list will come back empty if there are no records for (Record kinesisRecord : recordsResult.getRecords()) { - final List data; + final List data; if (deaggregateHandle == null || getDataHandle == null) { throw new ISE("deaggregateHandle or getDataHandle is null!"); @@ -256,15 +256,15 @@ private Runnable fetchRecords() data = new ArrayList<>(); - final List userRecords = (List) deaggregateHandle.invokeExact( + final List userRecords = (List) deaggregateHandle.invokeExact( Collections.singletonList(kinesisRecord) ); int recordSize = 0; - for (Object userRecord : userRecords) { - ByteEntity byteEntity = new ByteEntity((ByteBuffer) getDataHandle.invoke(userRecord)); - recordSize += byteEntity.getBuffer().array().length; - data.add(byteEntity); + for (UserRecord userRecord : userRecords) { + KinesisRecordEntity kinesisRecordEntity = new KinesisRecordEntity(userRecord); + recordSize += kinesisRecordEntity.getBuffer().array().length; + data.add(kinesisRecordEntity); } @@ -408,7 +408,7 @@ private long getPartitionTimeLag() private final ConcurrentMap, PartitionResource> partitionResources = new ConcurrentHashMap<>(); - private MemoryBoundLinkedBlockingQueue> records; + private MemoryBoundLinkedBlockingQueue> records; private final boolean backgroundFetchEnabled; private volatile boolean closed = false; @@ -615,12 +615,12 @@ public String getPosition(StreamPartition partition) @Nonnull @Override - public List> poll(long timeout) + public List> poll(long timeout) { start(); try { - List>> polledRecords = new ArrayList<>(); + List>> polledRecords = new ArrayList<>(); records.drain( polledRecords, @@ -1040,7 +1040,7 @@ private void filterBufferAndResetBackgroundFetch(Set> pa } // filter records in buffer and only retain ones whose partition was not seeked - MemoryBoundLinkedBlockingQueue> newQ = + MemoryBoundLinkedBlockingQueue> newQ = new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes); records.stream() diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index a142f4147627..2f00c8c16cc9 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.utils.IdUtils; -import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.kinesis.KinesisDataSourceMetadata; @@ -74,7 +74,7 @@ * tasks to satisfy the desired number of replicas. As tasks complete, new tasks are queued to process the next range of * Kinesis sequences. */ -public class KinesisSupervisor extends SeekableStreamSupervisor +public class KinesisSupervisor extends SeekableStreamSupervisor { private static final EmittingLogger log = new EmittingLogger(KinesisSupervisor.class); @@ -150,7 +150,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( } @Override - protected List> createIndexTasks( + protected List> createIndexTasks( int replicas, String baseSequenceName, ObjectMapper sortingMapper, @@ -164,7 +164,7 @@ protected List> createIndexT final Map context = createBaseTaskContexts(); context.put(CHECKPOINTS_CTX_KEY, checkpoints); - List> taskList = new ArrayList<>(); + List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName); taskList.add(new KinesisIndexTask( @@ -183,7 +183,7 @@ protected List> createIndexT @Override - protected RecordSupplier setupRecordSupplier() throws RuntimeException + protected RecordSupplier setupRecordSupplier() throws RuntimeException { KinesisSupervisorIOConfig ioConfig = spec.getIoConfig(); KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 527a6738ffe9..80bded2031d4 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.kinesis; +import com.amazonaws.services.kinesis.model.Record; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -41,6 +42,7 @@ import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.report.IngestionStatsAndErrors; @@ -127,39 +129,39 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase private static final String SHARD_ID0 = "0"; private static final List RECORDS = Arrays.asList( - createRecord("1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), - createRecord("1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), - createRecord("1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), - createRecord("1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")), - createRecord("1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")), - createRecord("1", "5", jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), - createRecord("1", "6", new ByteEntity(StringUtils.toUtf8("unparseable"))), - createRecord("1", "7", new ByteEntity(StringUtils.toUtf8(""))), - createRecord("1", "8", new ByteEntity(StringUtils.toUtf8("{}"))), - createRecord("1", "9", jb("2013", "f", "y", "10", "20.0", "1.0")), - createRecord("1", "10", jb("2049", "f", "y", "notanumber", "20.0", "1.0")), - createRecord("1", "11", jb("2049", "f", "y", "10", "notanumber", "1.0")), - createRecord("1", "12", jb("2049", "f", "y", "10", "20.0", "notanumber")), - createRecord("0", "0", jb("2012", "g", "y", "10", "20.0", "1.0")), - createRecord("0", "1", jb("2011", "h", "y", "10", "20.0", "1.0")) + createRecord("1", "0", kjb("2008", "a", "y", "10", "20.0", "1.0")), + createRecord("1", "1", kjb("2009", "b", "y", "10", "20.0", "1.0")), + createRecord("1", "2", kjb("2010", "c", "y", "10", "20.0", "1.0")), + createRecord("1", "3", kjb("2011", "d", "y", "10", "20.0", "1.0")), + createRecord("1", "4", kjb("2011", "e", "y", "10", "20.0", "1.0")), + createRecord("1", "5", kjb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), + createRecord("1", "6", new KinesisRecordEntity(new Record().withData(new ByteEntity(StringUtils.toUtf8("unparseable")).getBuffer()))), + createRecord("1", "7", new KinesisRecordEntity(new Record().withData(new ByteEntity(StringUtils.toUtf8("")).getBuffer()))), + createRecord("1", "8", new KinesisRecordEntity(new Record().withData(new ByteEntity(StringUtils.toUtf8("{}")).getBuffer()))), + createRecord("1", "9", kjb("2013", "f", "y", "10", "20.0", "1.0")), + createRecord("1", "10", kjb("2049", "f", "y", "notanumber", "20.0", "1.0")), + createRecord("1", "11", kjb("2049", "f", "y", "10", "notanumber", "1.0")), + createRecord("1", "12", kjb("2049", "f", "y", "10", "20.0", "notanumber")), + createRecord("0", "0", kjb("2012", "g", "y", "10", "20.0", "1.0")), + createRecord("0", "1", kjb("2011", "h", "y", "10", "20.0", "1.0")) ); private static final List SINGLE_PARTITION_RECORDS = Arrays.asList( - createRecord("1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), - createRecord("1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), - createRecord("1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), - createRecord("1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")), - createRecord("1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")), - createRecord("1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")), - createRecord("1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")), - createRecord("1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")), - createRecord("1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")), - createRecord("1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")), - createRecord("1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")), - createRecord("1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")), - createRecord("1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")), - createRecord("1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")), - createRecord("1", "14", jb("2013", "e", "y", "10", "20.0", "1.0")) + createRecord("1", "0", kjb("2008", "a", "y", "10", "20.0", "1.0")), + createRecord("1", "1", kjb("2009", "b", "y", "10", "20.0", "1.0")), + createRecord("1", "2", kjb("2010", "c", "y", "10", "20.0", "1.0")), + createRecord("1", "3", kjb("2011", "d", "y", "10", "20.0", "1.0")), + createRecord("1", "4", kjb("2011", "e", "y", "10", "20.0", "1.0")), + createRecord("1", "5", kjb("2012", "a", "y", "10", "20.0", "1.0")), + createRecord("1", "6", kjb("2013", "b", "y", "10", "20.0", "1.0")), + createRecord("1", "7", kjb("2010", "c", "y", "10", "20.0", "1.0")), + createRecord("1", "8", kjb("2011", "d", "y", "10", "20.0", "1.0")), + createRecord("1", "9", kjb("2011", "e", "y", "10", "20.0", "1.0")), + createRecord("1", "10", kjb("2008", "a", "y", "10", "20.0", "1.0")), + createRecord("1", "11", kjb("2009", "b", "y", "10", "20.0", "1.0")), + createRecord("1", "12", kjb("2010", "c", "y", "10", "20.0", "1.0")), + createRecord("1", "13", kjb("2012", "d", "y", "10", "20.0", "1.0")), + createRecord("1", "14", kjb("2013", "e", "y", "10", "20.0", "1.0")) ); private static KinesisRecordSupplier recordSupplier; @@ -272,12 +274,12 @@ private static KinesisRecord clone(KinesisRecord record) record.getPartitionId(), record.getSequenceNumber(), record.getData().stream() - .map(entity -> new ByteEntity(entity.getBuffer())) + .map(entity -> new KinesisRecordEntity(new Record().withData(entity.getBuffer()))) .collect(Collectors.toList()) ); } - private static List> clone( + private static List> clone( List records, int start, int end @@ -289,14 +291,14 @@ private static List> clon /** * Records can only be read once, hence we must use fresh records every time. */ - private static List> clone( + private static List> clone( List records ) { return records.stream().map(KinesisIndexTaskTest::clone).collect(Collectors.toList()); } - private static KinesisRecord createRecord(String partitionId, String sequenceNumber, ByteEntity entity) + private static KinesisRecord createRecord(String partitionId, String sequenceNumber, KinesisRecordEntity entity) { return new KinesisRecord(STREAM, partitionId, sequenceNumber, Collections.singletonList(entity)); } @@ -1697,7 +1699,7 @@ public void testRestoreAfterPersistingSequences() throws Exception maxRowsPerSegment = 2; maxRecordsPerPoll = 1; maxBytesPerPoll = 1_000_000; - List> records = + List> records = clone(SINGLE_PARTITION_RECORDS); recordSupplier.assign(EasyMock.anyObject()); @@ -2148,7 +2150,7 @@ public void testEndOfShard() throws Exception recordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - List> eosRecord = ImmutableList.of( + List> eosRecord = ImmutableList.of( new OrderedPartitionableRecord<>(STREAM, SHARD_ID1, KinesisSequenceNumber.END_OF_SHARD_MARKER, null) ); @@ -2454,6 +2456,18 @@ private boolean isTaskReading(KinesisIndexTask task) return task.getRunner().getStatus() == SeekableStreamIndexTaskRunner.Status.READING; } + private static KinesisRecordEntity kjb( + String timestamp, + String dim1, + String dim2, + String dimLong, + String dimFloat, + String met1 + ) + { + return new KinesisRecordEntity(new Record().withData(jb(timestamp, dim1, dim2, dimLong, dimFloat, met1).getBuffer())); + } + @JsonTypeName("index_kinesis") private static class TestableKinesisIndexTask extends KinesisIndexTask { @@ -2497,15 +2511,15 @@ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) /** * Utility class to keep the test code more readable. */ - private static class KinesisRecord extends OrderedPartitionableRecord + private static class KinesisRecord extends OrderedPartitionableRecord { - private final List data; + private final List data; public KinesisRecord( String stream, String partitionId, String sequenceNumber, - List data + List data ) { super(stream, partitionId, sequenceNumber, data); @@ -2514,7 +2528,7 @@ public KinesisRecord( @Nonnull @Override - public List getData() + public List getData() { return data; } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 5fcf81139eb6..7c59ad61ac09 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -39,6 +39,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; @@ -99,34 +100,26 @@ public class KinesisRecordSupplierTest extends EasyMockSupport new Record().withData(jb("2012", "g", "y", "10", "20.0", "1.0")).withSequenceNumber("8"), new Record().withData(jb("2011", "h", "y", "10", "20.0", "1.0")).withSequenceNumber("9") ); - private static final List> ALL_RECORDS = ImmutableList.>builder() - .addAll(SHARD0_RECORDS.stream() - .map(x -> new OrderedPartitionableRecord<>( - STREAM, - SHARD_ID0, - x.getSequenceNumber(), - Collections - .singletonList( - new ByteEntity( - x.getData())) - )) - .collect( - Collectors - .toList())) - .addAll(SHARD1_RECORDS.stream() - .map(x -> new OrderedPartitionableRecord<>( - STREAM, - SHARD_ID1, - x.getSequenceNumber(), - Collections - .singletonList( - new ByteEntity( - x.getData())) - )) - .collect( - Collectors - .toList())) - .build(); + private static final List> ALL_RECORDS = ImmutableList.>builder() + .addAll(SHARD0_RECORDS.stream() + .map(x -> new OrderedPartitionableRecord<>( + STREAM, + SHARD_ID0, + x.getSequenceNumber(), + Collections.singletonList(new KinesisRecordEntity(new Record().withData(new ByteEntity(x.getData()).getBuffer()))) + )) + .collect( + Collectors + .toList())) + .addAll(SHARD1_RECORDS.stream() + .map(x -> new OrderedPartitionableRecord<>( + STREAM, + SHARD_ID1, + x.getSequenceNumber(), + Collections.singletonList(new KinesisRecordEntity(new Record().withData(new ByteEntity(x.getData()).getBuffer()))) + )) + .collect(Collectors.toList())) + .build(); private static ByteBuffer jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1) @@ -316,7 +309,7 @@ private static GetRecordsRequest generateGetRecordsWithLimitReq(String shardIter } // filter out EOS markers - private static List> cleanRecords(List> records) + private static List> cleanRecords(List> records) { return records.stream() .filter(x -> !x.getSequenceNumber() @@ -398,7 +391,7 @@ public void testPollWithKinesisInternalFailure() throws InterruptedException Thread.sleep(100); } - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); @@ -457,7 +450,7 @@ public void testPollWithKinesisNonRetryableFailure() throws InterruptedException } Assert.assertFalse(recordSupplier.isAnyFetchActive()); - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); @@ -531,7 +524,7 @@ public void testSeek() Thread.sleep(100); } - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); @@ -687,7 +680,7 @@ public void testPollAfterSeek() Thread.sleep(100); } - OrderedPartitionableRecord firstRecord = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); + OrderedPartitionableRecord firstRecord = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); Assert.assertEquals( ALL_RECORDS.get(7), @@ -705,7 +698,7 @@ public void testPollAfterSeek() } - OrderedPartitionableRecord record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); + OrderedPartitionableRecord record2 = recordSupplier.poll(POLL_TIMEOUT_MILLIS).get(0); Assert.assertEquals(ALL_RECORDS.get(9), record2); // only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS @@ -776,7 +769,7 @@ public void testPollDeaggregate() throws InterruptedException Thread.sleep(100); } - List> polledRecords = cleanRecords(recordSupplier.poll( + List> polledRecords = cleanRecords(recordSupplier.poll( POLL_TIMEOUT_MILLIS)); verifyAll(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index b0ba730a3502..63144c6a9353 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.kinesis; +import com.amazonaws.services.kinesis.model.Record; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -27,7 +28,6 @@ import org.apache.druid.client.indexing.SamplerSpec; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.common.config.NullHandling; -import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.FloatDimensionSchema; import org.apache.druid.data.input.impl.InputRowParser; @@ -37,6 +37,7 @@ import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec; import org.apache.druid.indexing.overlord.sampler.InputSourceSampler; @@ -63,6 +64,7 @@ import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -99,7 +101,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport private final KinesisRecordSupplier recordSupplier = mock(KinesisRecordSupplier.class); - private static List> generateRecords(String stream) + private static List> generateRecords(String stream) { return ImmutableList.of( new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), @@ -115,9 +117,9 @@ private static List> gene stream, "1", "6", - Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable"))) + Collections.singletonList(new KinesisRecordEntity(new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("unparseable"))))) ), - new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}")))) + new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new KinesisRecordEntity(new Record().withData(ByteBuffer.wrap(StringUtils.toUtf8("{}")))))) ); } @@ -428,19 +430,19 @@ private void runSamplerAndCompareResponse(SamplerSpec samplerSpec, boolean useIn Assert.assertFalse(it.hasNext()); } - private static List jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1) + private static List jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1) { try { - return Collections.singletonList(new ByteEntity(new ObjectMapper().writeValueAsBytes( + return Collections.singletonList(new KinesisRecordEntity(new Record().withData(ByteBuffer.wrap(new ObjectMapper().writeValueAsBytes( ImmutableMap.builder() - .put("timestamp", ts) - .put("dim1", dim1) - .put("dim2", dim2) - .put("dimLong", dimLong) - .put("dimFloat", dimFloat) - .put("met1", met1) - .build() - ))); + .put("timestamp", ts) + .put("dim1", dim1) + .put("dim2", dim2) + .put("dimLong", dimLong) + .put("dimFloat", dimFloat) + .put("met1", met1) + .build() + ))))); } catch (Exception e) { throw new RuntimeException(e); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 9001f148e99b..e6ed27c9cecc 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -27,12 +27,12 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.kinesis.KinesisRecordEntity; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskInfoProvider; @@ -5656,7 +5656,7 @@ public String generateSequenceName( } @Override - protected RecordSupplier setupRecordSupplier() + protected RecordSupplier setupRecordSupplier() { return supervisorRecordSupplier; } From ba4d447d207bae2d389dc3a50d4df5d951e5b92e Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 30 Jul 2024 03:03:26 -0400 Subject: [PATCH 05/17] * add kinesisInputFormat and Reader, and tests --- .../input/kinesis/KinesisInputFormat.java | 126 ++++ .../input/kinesis/KinesisInputReader.java | 256 +++++++ .../input/kinesis/KinesisInputFormatTest.java | 661 ++++++++++++++++++ 3 files changed, 1043 insertions(+) create mode 100644 extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java create mode 100644 extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java create mode 100644 extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java new file mode 100644 index 000000000000..e03d11454ceb --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kinesis; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.seekablestream.SettableByteEntity; +import org.apache.druid.java.util.common.DateTimes; + +import javax.annotation.Nullable; + +import java.io.File; +import java.util.Objects; + +public class KinesisInputFormat implements InputFormat +{ + private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kinesis.timestamp"; + public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; + + // Since KinesisInputFormat blends data from header, and payload, timestamp spec can be pointing to an attribute within one of these + // 2 sections. To handle scenarios where there is no timestamp value either in payload, we induce an artifical timestamp value + // to avoid unnecessary parser barf out. Users in such situations can use the inputFormat's kinesis record timestamp as its primary timestamp. + private final TimestampSpec dummyTimestampSpec = new TimestampSpec(DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH); + + private final InputFormat valueFormat; + private final String timestampColumnName; + + public KinesisInputFormat( + @JsonProperty("valueFormat") InputFormat valueFormat, + @JsonProperty("timestampColumnName") @Nullable String timestampColumnName + ) + { + this.valueFormat = Preconditions.checkNotNull(valueFormat, "valueFormat must not be null"); + this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME; + } + + @Override + public boolean isSplittable() + { + return false; + } + + @Override + public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) + { + final SettableByteEntity settableByteEntitySource; + if (source instanceof SettableByteEntity) { + settableByteEntitySource = (SettableByteEntity) source; + } else { + settableByteEntitySource = new SettableByteEntity<>(); + settableByteEntitySource.setEntity((KinesisRecordEntity) source); + } + InputRowSchema newInputRowSchema = new InputRowSchema( + dummyTimestampSpec, + inputRowSchema.getDimensionsSpec(), + inputRowSchema.getColumnsFilter(), + inputRowSchema.getMetricNames() + ); + return new KinesisInputReader( + inputRowSchema, + settableByteEntitySource, + JsonInputFormat.withLineSplittable(valueFormat, false).createReader( + newInputRowSchema, + source, + temporaryDirectory + ), + timestampColumnName + ); + } + + @JsonProperty + public InputFormat getValueFormat() + { + return valueFormat; + } + + @Nullable + @JsonProperty + public String getTimestampColumnName() + { + return timestampColumnName; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + KinesisInputFormat that = (KinesisInputFormat) o; + return Objects.equals(valueFormat, that.valueFormat) + && Objects.equals(timestampColumnName, that.timestampColumnName); + } + + @Override + public int hashCode() + { + return Objects.hash(valueFormat, timestampColumnName); + } +} diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java new file mode 100644 index 000000000000..80c358b528d2 --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kinesis; + +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.indexing.seekablestream.SettableByteEntity; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class KinesisInputReader implements InputEntityReader +{ + + private final InputRowSchema inputRowSchema; + private final SettableByteEntity source; + private final InputEntityReader valueParser; + private final String timestampColumnName; + + public KinesisInputReader( + InputRowSchema inputRowSchema, + SettableByteEntity source, + InputEntityReader valueParser, + String timestampColumnName + ) + { + this.inputRowSchema = inputRowSchema; + this.source = source; + this.valueParser = valueParser; + this.timestampColumnName = timestampColumnName; + + } + + @Override + public CloseableIterator read() throws IOException + { + final KinesisRecordEntity record = source.getEntity(); + final Map mergedHeaderMap = extractHeaders(record); + + // Ignore tombstone records that have null values. + if (record.getRecord().getData() != null) { + return buildBlendedRows(valueParser, mergedHeaderMap); + } else { + return CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator()); + } + } + + @Override + public CloseableIterator sample() throws IOException + { + final KinesisRecordEntity record = source.getEntity(); + InputRowListPlusRawValues keysAndHeader = extractHeaderAndKeysSample(record); + if (record.getRecord().getData() != null) { + return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues()); + } else { + final List rows = Collections.singletonList(keysAndHeader); + return CloseableIterators.withEmptyBaggage(rows.iterator()); + } + } + + private Map extractHeaders(KinesisRecordEntity record) + { + final Map mergedHeaderMap = new HashMap<>(); + // Add kafka record timestamp to the mergelist, we will skip record timestamp if the same key exists already in + // the header list + mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().getApproximateArrivalTimestamp().getTime()); + + return mergedHeaderMap; + } + + private CloseableIterator buildBlendedRows( + InputEntityReader valueParser, + Map headerKeyList + ) throws IOException + { + return valueParser.read().map( + r -> { + final HashSet newDimensions = new HashSet<>(r.getDimensions()); + final Map event = buildBlendedEventMap(r::getRaw, newDimensions, headerKeyList); + newDimensions.addAll(headerKeyList.keySet()); + // Remove the dummy timestamp added in KafkaInputFormat + newDimensions.remove(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); + + final DateTime timestamp = MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event); + return new MapBasedInputRow( + timestamp, + MapInputRowParser.findDimensions( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + newDimensions + ), + event + ); + } + ); + } + + private InputRowListPlusRawValues extractHeaderAndKeysSample(KinesisRecordEntity record) + { + Map mergedHeaderMap = extractHeaders(record); + return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap); + } + + private CloseableIterator buildBlendedRowsSample( + InputEntityReader valueParser, + Map headerKeyList + ) throws IOException + { + return valueParser.sample().map( + rowAndValues -> { + if (rowAndValues.getParseException() != null) { + return rowAndValues; + } + List newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size()); + List> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size()); + + for (Map raw : rowAndValues.getRawValuesList()) { + newRawRows.add(buildBlendedEventMap(raw::get, raw.keySet(), headerKeyList)); + } + for (InputRow r : rowAndValues.getInputRows()) { + if (r != null) { + final HashSet newDimensions = new HashSet<>(r.getDimensions()); + final Map event = buildBlendedEventMap( + r::getRaw, + newDimensions, + headerKeyList + ); + newDimensions.addAll(headerKeyList.keySet()); + // Remove the dummy timestamp added in KafkaInputFormat + newDimensions.remove(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); + newInputRows.add( + new MapBasedInputRow( + inputRowSchema.getTimestampSpec().extractTimestamp(event), + MapInputRowParser.findDimensions( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + newDimensions + ), + event + ) + ); + } + } + return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, null); + } + ); + } + + private List buildInputRowsForMap(Map headerKeyList) + { + return Collections.singletonList( + new MapBasedInputRow( + inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList), + MapInputRowParser.findDimensions( + inputRowSchema.getTimestampSpec(), + inputRowSchema.getDimensionsSpec(), + headerKeyList.keySet() + ), + headerKeyList + ) + ); + } + + private Map buildBlendedEventMap( + Function getRowValue, + Set rowDimensions, + Map fallback + ) + { + final Set keySet = new HashSet<>(fallback.keySet()); + keySet.addAll(rowDimensions); + + return new AbstractMap() + { + @Override + public Object get(Object key) + { + final String skey = (String) key; + final Object val = getRowValue.apply(skey); + if (val == null) { + return fallback.get(skey); + } + return val; + } + + @Override + public Set keySet() + { + return keySet; + } + + @Override + public Set> entrySet() + { + return keySet().stream() + .map( + field -> new Entry() + { + @Override + public String getKey() + { + return field; + } + + @Override + public Object getValue() + { + return get(field); + } + + @Override + public Object setValue(final Object value) + { + throw new UnsupportedOperationException(); + } + } + ) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + }; + } +} diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java new file mode 100644 index 000000000000..d15af6c97aae --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java @@ -0,0 +1,661 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.kinesis; + +import com.amazonaws.services.kinesis.model.Record; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.seekablestream.SettableByteEntity; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +public class KinesisInputFormatTest +{ + static { + NullHandling.initializeForTests(); + } + + + private static final String KINESIS_APPROXIMATE_TIME_DATE = "2024-07-29"; + private static final long KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS = DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis(); + private static final String DATA_TIMSTAMP_DATE = "2024-07-30"; + + private static final byte[] SIMPLE_JSON_VALUE_BYTES = StringUtils.toUtf8( + TestUtils.singleQuoteToStandardJson( + "{" + + " 'timestamp': '" + DATA_TIMSTAMP_DATE + "'," + + " 'bar': null," + + " 'foo': 'x'," + + " 'baz': 4," + + " 'o': {'mg': 1}" + + "}" + ) + ); + + private KinesisInputFormat format; + + @Before + public void setUp() + { + format = new KinesisInputFormat( + // Value Format + new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, + false + ), + "kinesis.newts.timestamp" + ); + } + + @Test + public void testSerde() throws JsonProcessingException + { + final ObjectMapper mapper = new ObjectMapper(); + KinesisInputFormat kif = new KinesisInputFormat( + // Value Format + new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, + false + ), + "kinesis.newts.timestamp" + ); + Assert.assertEquals(format, kif); + + final byte[] formatBytes = mapper.writeValueAsBytes(format); + final byte[] kifBytes = mapper.writeValueAsBytes(kif); + Assert.assertArrayEquals(formatBytes, kifBytes); + } + + @Test + public void testTimestampFromHeader() throws IOException + { + KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("kinesis.newts.timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testWithMultipleMixedRecordsTimestampFromHeader() throws IOException + { + final byte[][] values = new byte[5][]; + for (int i = 0; i < values.length; i++) { + values[i] = StringUtils.toUtf8( + "{\n" + + " \"timestamp\": \"2024-07-2" + i + "\",\n" + + " \"bar\": null,\n" + + " \"foo\": \"x\",\n" + + " \"baz\": 4,\n" + + " \"index\": " + i + ",\n" + + " \"o\": {\n" + + " \"mg\": 1\n" + + " }\n" + + "}" + ); + } + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("kinesis.newts.timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kinesis.newts.timestamp" + ) + ) + ), + ColumnsFilter.all() + ), + settableByteEntity, + null + ); + + for (int i = 0; i < values.length; i++) { + KinesisRecordEntity inputEntity = makeInputEntity(values[i], DateTimes.of("2024-07-1" + i).getMillis()); + settableByteEntity.setEntity(inputEntity); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + + // Payload verification + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + Assert.assertEquals(DateTimes.of("2024-07-1" + i), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of("2024-07-1" + i).getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + } + + @Test + public void testTimestampFromData() throws IOException + { + KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals(DateTimes.of(DATA_TIMSTAMP_DATE), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testWithMultipleMixedRecordsTimestampFromData() throws IOException + { + final byte[][] values = new byte[5][]; + for (int i = 0; i < values.length; i++) { + values[i] = StringUtils.toUtf8( + "{\n" + + " \"timestamp\": \"2024-07-2" + i + "\",\n" + + " \"bar\": null,\n" + + " \"foo\": \"x\",\n" + + " \"baz\": 4,\n" + + " \"index\": " + i + ",\n" + + " \"o\": {\n" + + " \"mg\": 1\n" + + " }\n" + + "}" + ); + } + + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kinesis.newts.timestamp" + ) + ) + ), + ColumnsFilter.all() + ), + settableByteEntity, + null + ); + + for (int i = 0; i < values.length; i++) { + KinesisRecordEntity inputEntity = makeInputEntity(values[i], KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + settableByteEntity.setEntity(inputEntity); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + + // Payload verification + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + Assert.assertEquals(DateTimes.of("2024-07-2" + i), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of("2024-07-29").getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + } + + @Test + public void testMissingTimestampThrowsException() throws IOException + { + KinesisRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("time", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kinesis.newts.timestamp" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { + Throwable t = Assert.assertThrows(ParseException.class, iterator::next); + Assert.assertTrue( + t.getMessage().startsWith("Timestamp[null] is unparseable! Event: {") + ); + } + } + } + + @Test + public void testWithSchemaDiscovery() throws IOException + { + KinesisRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + List expectedDimensions = Arrays.asList( + "foo", + "kinesis.newts.timestamp", + "root_baz", + "o", + "bar", + "path_omg", + "jq_omg", + "jq_omg2", + "baz", + "root_baz2", + "path_omg2" + ); + Collections.sort(expectedDimensions); + Collections.sort(row.getDimensions()); + Assert.assertEquals( + expectedDimensions, + row.getDimensions() + ); + + // Payload verifications + Assert.assertEquals(DateTimes.of(DATA_TIMSTAMP_DATE), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testValueInCsvFormat() throws IOException + { + format = new KinesisInputFormat( + // Value Format + new CsvInputFormat( + Arrays.asList("foo", "bar", "timestamp", "baz"), + null, + false, + false, + 0 + ), + "kinesis.newts.timestamp" + ); + + KinesisRecordEntity inputEntity = + makeInputEntity(StringUtils.toUtf8("x,,2024-07-30,4"), KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kinesis.newts.timestamp" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + Assert.assertEquals( + Arrays.asList( + "bar", + "foo", + "kinesis.newts.timestamp" + ), + row.getDimensions() + ); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals(DateTimes.of("2024-07-30"), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertTrue(row.getDimension("bar").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testWithPartialDeclarationSchemaDiscovery() throws IOException + { + KinesisRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + DimensionsSpec.builder().setDimensions( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar")) + ).useSchemaDiscovery(true).build(), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + + List expectedDimensions = Arrays.asList( + "bar", + "foo", + "kinesis.newts.timestamp", + "root_baz", + "o", + "path_omg", + "jq_omg", + "jq_omg2", + "baz", + "root_baz2", + "path_omg2" + ); + Collections.sort(expectedDimensions); + Collections.sort(row.getDimensions()); + Assert.assertEquals( + expectedDimensions, + row.getDimensions() + ); + + // Payload verifications + Assert.assertEquals(DateTimes.of(DATA_TIMSTAMP_DATE), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + private KinesisRecordEntity makeInputEntity( + byte[] payload, + long kinesisTimestampMillis) + { + return new KinesisRecordEntity( + new Record().withData(ByteBuffer.wrap(payload)) + .withApproximateArrivalTimestamp(new Date(kinesisTimestampMillis)) + ); + } + + private SettableByteEntity newSettableByteEntity(KinesisRecordEntity kinesisRecordEntity) + { + SettableByteEntity settableByteEntity = new SettableByteEntity<>(); + settableByteEntity.setEntity(kinesisRecordEntity); + return settableByteEntity; + } +} From 90e0f129871e68e503b8dc685a3d086fddc0bda8 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 30 Jul 2024 03:24:49 -0400 Subject: [PATCH 06/17] * bind KinesisInputFormat class to module --- .../druid/indexing/kinesis/KinesisIndexingServiceModule.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java index 0cce1a7e6983..5ac9022d001d 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java @@ -26,6 +26,7 @@ import com.google.inject.Binder; import com.google.inject.name.Names; import org.apache.druid.common.aws.AWSCredentialsConfig; +import org.apache.druid.data.input.kinesis.KinesisInputFormat; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig; @@ -50,7 +51,8 @@ public List getJacksonModules() new NamedType(KinesisIndexTaskIOConfig.class, SCHEME), new NamedType(KinesisSupervisorTuningConfig.class, SCHEME), new NamedType(KinesisSupervisorSpec.class, SCHEME), - new NamedType(KinesisSamplerSpec.class, SCHEME) + new NamedType(KinesisSamplerSpec.class, SCHEME), + new NamedType(KinesisInputFormat.class, SCHEME) ) ); } From 709fd16e59324885a5818358f9558cb0945293cb Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 30 Jul 2024 14:38:11 -0400 Subject: [PATCH 07/17] * improve test coverage --- .../input/kinesis/KinesisInputFormatTest.java | 192 +++++++++++++++++- 1 file changed, 189 insertions(+), 3 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java index d15af6c97aae..6f71798769d7 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -192,6 +193,121 @@ public void testTimestampFromHeader() throws IOException } } + @Test + public void testRawSample() throws IOException + { + KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.sample()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRowListPlusRawValues rawValues = iterator.next(); + Assert.assertEquals(1, rawValues.getInputRows().size()); + InputRow row = rawValues.getInputRows().get(0); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals( + String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testProcessesSampleTimestampFromHeader() throws IOException + { + KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("kinesis.newts.timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.sample()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRowListPlusRawValues rawValues = iterator.next(); + Assert.assertEquals(1, rawValues.getInputRows().size()); + InputRow row = rawValues.getInputRows().get(0); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals(DateTimes.of(String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS)), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + @Test public void testWithMultipleMixedRecordsTimestampFromHeader() throws IOException { @@ -438,7 +554,7 @@ public void testMissingTimestampThrowsException() throws IOException } @Test - public void testWithSchemaDiscovery() throws IOException + public void testWithSchemaDiscoveryKinesisTimestampExcluded() throws IOException { KinesisRecordEntity inputEntity = makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); @@ -446,7 +562,10 @@ public void testWithSchemaDiscovery() throws IOException final InputEntityReader reader = format.createReader( new InputRowSchema( new TimestampSpec("timestamp", "iso", null), - DimensionsSpec.builder().useSchemaDiscovery(true).build(), + DimensionsSpec.builder() + .useSchemaDiscovery(true) + .setDimensionExclusions(ImmutableList.of("kinesis.newts.timestamp")) + .build(), ColumnsFilter.all() ), newSettableByteEntity(inputEntity), @@ -461,7 +580,6 @@ public void testWithSchemaDiscovery() throws IOException final InputRow row = iterator.next(); List expectedDimensions = Arrays.asList( "foo", - "kinesis.newts.timestamp", "root_baz", "o", "bar", @@ -503,6 +621,74 @@ public void testWithSchemaDiscovery() throws IOException } } + @Test + public void testWithSchemaDiscoveryTimestampFromHeader() throws IOException + { + KinesisRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_VALUE_BYTES, KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("kinesis.newts.timestamp", "iso", null), + DimensionsSpec.builder() + .useSchemaDiscovery(true) + .build(), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + List expectedDimensions = Arrays.asList( + "foo", + "timestamp", + "root_baz", + "o", + "bar", + "path_omg", + "jq_omg", + "jq_omg2", + "baz", + "root_baz2", + "path_omg2" + ); + Collections.sort(expectedDimensions); + Collections.sort(row.getDimensions()); + Assert.assertEquals( + expectedDimensions, + row.getDimensions() + ); + + // Payload verifications + Assert.assertEquals(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE), row.getTimestamp()); + Assert.assertEquals( + String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), + Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) + ); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + @Test public void testValueInCsvFormat() throws IOException { From 2eaf0fe0e86b53cca674ef7d4cdc725a67bc691a Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 30 Jul 2024 17:11:55 -0400 Subject: [PATCH 08/17] * remove references to kafka --- .../apache/druid/data/input/kinesis/KinesisInputReader.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java index 80c358b528d2..cec7a3bc662e 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java @@ -95,7 +95,7 @@ public CloseableIterator sample() throws IOException private Map extractHeaders(KinesisRecordEntity record) { final Map mergedHeaderMap = new HashMap<>(); - // Add kafka record timestamp to the mergelist, we will skip record timestamp if the same key exists already in + // Add kinesis record timestamp to the mergelist, we will skip record timestamp if the same key exists already in // the header list mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().getApproximateArrivalTimestamp().getTime()); @@ -112,7 +112,7 @@ private CloseableIterator buildBlendedRows( final HashSet newDimensions = new HashSet<>(r.getDimensions()); final Map event = buildBlendedEventMap(r::getRaw, newDimensions, headerKeyList); newDimensions.addAll(headerKeyList.keySet()); - // Remove the dummy timestamp added in KafkaInputFormat + // Remove the dummy timestamp added in KinesisInputFormat newDimensions.remove(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); final DateTime timestamp = MapInputRowParser.parseTimestamp(inputRowSchema.getTimestampSpec(), event); @@ -160,7 +160,7 @@ private CloseableIterator buildBlendedRowsSample( headerKeyList ); newDimensions.addAll(headerKeyList.keySet()); - // Remove the dummy timestamp added in KafkaInputFormat + // Remove the dummy timestamp added in KinesisInputFormat newDimensions.remove(KinesisInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); newInputRows.add( new MapBasedInputRow( From 1dfd25eb06b1790518a7f0392b0d067a0f1c7d98 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 1 Aug 2024 17:06:50 -0400 Subject: [PATCH 09/17] * resolve review comments --- .../druid/data/input/kafka/KafkaRecordEntity.java | 2 -- .../druid/data/input/kinesis/KinesisInputFormat.java | 6 ++++++ .../druid/data/input/kinesis/KinesisInputReader.java | 10 +++++----- .../druid/data/input/kinesis/KinesisRecordEntity.java | 4 +--- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java index 41c2c0a03258..53369cc6ad60 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java @@ -33,8 +33,6 @@ * key, and timestamp. *

* NOTE: Any records with null values will be skipped, even if they contain non-null keys, or headers - *

- * This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions. */ public class KafkaRecordEntity extends ByteEntity { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java index e03d11454ceb..2070f95ff445 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java @@ -19,6 +19,7 @@ package org.apache.druid.data.input.kinesis; +import com.amazonaws.services.kinesis.model.Record; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputEntity; @@ -35,6 +36,11 @@ import java.io.File; import java.util.Objects; +/** + * Kinesis aware InputFormat. Allows for reading kinesis specific values that are stored in the {@link Record}. At + * this time, this input format only supports reading the main record payload ({@link Record#data}) and + * {@link Record#approximateArrivalTimestamp}, but can be extended easily to read other fields. + */ public class KinesisInputFormat implements InputFormat { private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kinesis.timestamp"; diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java index cec7a3bc662e..6c0ae382121d 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java @@ -83,11 +83,11 @@ public CloseableIterator read() throws IOException public CloseableIterator sample() throws IOException { final KinesisRecordEntity record = source.getEntity(); - InputRowListPlusRawValues keysAndHeader = extractHeaderAndKeysSample(record); + InputRowListPlusRawValues headers = extractHeaderSample(record); if (record.getRecord().getData() != null) { - return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues()); + return buildBlendedRowsSample(valueParser, headers.getRawValues()); } else { - final List rows = Collections.singletonList(keysAndHeader); + final List rows = Collections.singletonList(headers); return CloseableIterators.withEmptyBaggage(rows.iterator()); } } @@ -97,7 +97,7 @@ private Map extractHeaders(KinesisRecordEntity record) final Map mergedHeaderMap = new HashMap<>(); // Add kinesis record timestamp to the mergelist, we will skip record timestamp if the same key exists already in // the header list - mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().getApproximateArrivalTimestamp().getTime()); + mergedHeaderMap.put(timestampColumnName, record.getRecord().getApproximateArrivalTimestamp().getTime()); return mergedHeaderMap; } @@ -129,7 +129,7 @@ private CloseableIterator buildBlendedRows( ); } - private InputRowListPlusRawValues extractHeaderAndKeysSample(KinesisRecordEntity record) + private InputRowListPlusRawValues extractHeaderSample(KinesisRecordEntity record) { Map mergedHeaderMap = extractHeaders(record); return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java index 14fb92108a13..409d0a98ef26 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java @@ -32,9 +32,7 @@ * method also allows Kinesis-aware {@link InputFormat} implementations to read the full kinesis record, including * timestamp, encrytion key, patition key, and sequence number *

- * NOTE: Any records with null values will be skipped, even if they contain non-null keys, or headers - *

- * This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions. + * NOTE: Any records with null values will be skipped, even if they contain non-null headers */ public class KinesisRecordEntity extends ByteEntity { From 67f99e97e9c7733b140691fce53d2fff3396fe7c Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 1 Aug 2024 17:08:29 -0400 Subject: [PATCH 10/17] * remove comment --- .../apache/druid/data/input/kinesis/KinesisInputReader.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java index 6c0ae382121d..8faf6d563af9 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java @@ -95,10 +95,7 @@ public CloseableIterator sample() throws IOException private Map extractHeaders(KinesisRecordEntity record) { final Map mergedHeaderMap = new HashMap<>(); - // Add kinesis record timestamp to the mergelist, we will skip record timestamp if the same key exists already in - // the header list mergedHeaderMap.put(timestampColumnName, record.getRecord().getApproximateArrivalTimestamp().getTime()); - return mergedHeaderMap; } From 72580ea8f88170244f69fc166660be53db5f81bf Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 1 Aug 2024 17:17:01 -0400 Subject: [PATCH 11/17] * fix grammer of comment --- .../apache/druid/data/input/kinesis/KinesisInputFormat.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java index 2070f95ff445..7c4dea3b333e 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java @@ -46,8 +46,8 @@ public class KinesisInputFormat implements InputFormat private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kinesis.timestamp"; public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; - // Since KinesisInputFormat blends data from header, and payload, timestamp spec can be pointing to an attribute within one of these - // 2 sections. To handle scenarios where there is no timestamp value either in payload, we induce an artifical timestamp value + // Since KinesisInputFormat blends data from headers, and payload, timestamp spec can be pointing to an attribute within one of these + // 2 sections. To handle scenarios where there is no timestamp value either in payload or headers, we induce an artifical timestamp value // to avoid unnecessary parser barf out. Users in such situations can use the inputFormat's kinesis record timestamp as its primary timestamp. private final TimestampSpec dummyTimestampSpec = new TimestampSpec(DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH); From 4c889a0cae677a2f0039d2d4b46067edb0348529 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 1 Aug 2024 17:19:56 -0400 Subject: [PATCH 12/17] * fix comment again --- .../apache/druid/data/input/kinesis/KinesisInputFormat.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java index 7c4dea3b333e..7d975e239b09 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java @@ -46,8 +46,8 @@ public class KinesisInputFormat implements InputFormat private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kinesis.timestamp"; public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; - // Since KinesisInputFormat blends data from headers, and payload, timestamp spec can be pointing to an attribute within one of these - // 2 sections. To handle scenarios where there is no timestamp value either in payload or headers, we induce an artifical timestamp value + // Since KinesisInputFormat blends data from record timestamp, and payload, timestamp spec can be pointing to an attribute within one of these + // 2 sections. To handle scenarios where there is no timestamp value either in payload or record timestamp, we induce an artifical timestamp value // to avoid unnecessary parser barf out. Users in such situations can use the inputFormat's kinesis record timestamp as its primary timestamp. private final TimestampSpec dummyTimestampSpec = new TimestampSpec(DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH); From e16524b974fda8d709187d5cbb5a96d1f2d7220e Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 1 Aug 2024 17:55:08 -0400 Subject: [PATCH 13/17] * fix comment again --- .../druid/data/input/kinesis/KinesisInputFormat.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java index 7d975e239b09..9124e9e3b10f 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java @@ -46,9 +46,10 @@ public class KinesisInputFormat implements InputFormat private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kinesis.timestamp"; public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; - // Since KinesisInputFormat blends data from record timestamp, and payload, timestamp spec can be pointing to an attribute within one of these - // 2 sections. To handle scenarios where there is no timestamp value either in payload or record timestamp, we induce an artifical timestamp value - // to avoid unnecessary parser barf out. Users in such situations can use the inputFormat's kinesis record timestamp as its primary timestamp. + // Since KinesisInputFormat blends data from record properties, and payload, timestamp spec can be pointing to an + // attribute within one of these 2 sections. To handle scenarios where there is no timestamp value in the payload, we + // induce an artificial timestamp value to avoid unnecessary parser barf out. Users in such situations can use the + // inputFormat's kinesis record timestamp as its primary timestamp. private final TimestampSpec dummyTimestampSpec = new TimestampSpec(DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH); private final InputFormat valueFormat; From 517c68ebaea31991faf7bdac5eae9ad26cf51b44 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 1 Aug 2024 18:09:24 -0400 Subject: [PATCH 14/17] * more review comments --- .../org/apache/druid/data/input/kinesis/KinesisInputReader.java | 1 - .../apache/druid/data/input/kinesis/KinesisRecordEntity.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java index 8faf6d563af9..2ea96b7ac153 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java @@ -71,7 +71,6 @@ public CloseableIterator read() throws IOException final KinesisRecordEntity record = source.getEntity(); final Map mergedHeaderMap = extractHeaders(record); - // Ignore tombstone records that have null values. if (record.getRecord().getData() != null) { return buildBlendedRows(valueParser, mergedHeaderMap); } else { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java index 409d0a98ef26..a490fd8f4c36 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisRecordEntity.java @@ -32,7 +32,7 @@ * method also allows Kinesis-aware {@link InputFormat} implementations to read the full kinesis record, including * timestamp, encrytion key, patition key, and sequence number *

- * NOTE: Any records with null values will be skipped, even if they contain non-null headers + * NOTE: Any records with null values will be returned as records with just only kinesis properties and no data payload */ public class KinesisRecordEntity extends ByteEntity { From 238f80e4910c4048771d5cd2592ab151236445f1 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 1 Aug 2024 18:37:28 -0400 Subject: [PATCH 15/17] * add partitionKey --- .../input/kinesis/KinesisInputFormat.java | 30 +++++++++++++++---- .../input/kinesis/KinesisInputReader.java | 4 +++ .../input/kinesis/KinesisInputFormatTest.java | 28 ++++++++++++++--- 3 files changed, 53 insertions(+), 9 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java index 9124e9e3b10f..eff2ed6f4aa9 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java @@ -38,29 +38,40 @@ /** * Kinesis aware InputFormat. Allows for reading kinesis specific values that are stored in the {@link Record}. At - * this time, this input format only supports reading the main record payload ({@link Record#data}) and - * {@link Record#approximateArrivalTimestamp}, but can be extended easily to read other fields. + * this time, this input format only supports reading data from the following record components + *

+ * - {@link Record#data} + * - {@link Record#approximateArrivalTimestamp} + * - {@link Record#partitionKey} + *

+ * This class can be extended easily to read other fields available in the kinesis record. */ public class KinesisInputFormat implements InputFormat { private static final String DEFAULT_TIMESTAMP_COLUMN_NAME = "kinesis.timestamp"; - public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; + private static final String DEFAULT_PARTITION_KEY_COLUMN_NAME = "kinesis.partitionKey"; // Since KinesisInputFormat blends data from record properties, and payload, timestamp spec can be pointing to an // attribute within one of these 2 sections. To handle scenarios where there is no timestamp value in the payload, we // induce an artificial timestamp value to avoid unnecessary parser barf out. Users in such situations can use the // inputFormat's kinesis record timestamp as its primary timestamp. + public static final String DEFAULT_AUTO_TIMESTAMP_STRING = "__kif_auto_timestamp"; private final TimestampSpec dummyTimestampSpec = new TimestampSpec(DEFAULT_AUTO_TIMESTAMP_STRING, "auto", DateTimes.EPOCH); private final InputFormat valueFormat; private final String timestampColumnName; + private final String partitionKeyColumnName; public KinesisInputFormat( @JsonProperty("valueFormat") InputFormat valueFormat, + @JsonProperty("partitionKeyColumnName") @Nullable String partitionKeyColumnName, @JsonProperty("timestampColumnName") @Nullable String timestampColumnName ) { this.valueFormat = Preconditions.checkNotNull(valueFormat, "valueFormat must not be null"); + this.partitionKeyColumnName = partitionKeyColumnName != null + ? partitionKeyColumnName + : DEFAULT_PARTITION_KEY_COLUMN_NAME; this.timestampColumnName = timestampColumnName != null ? timestampColumnName : DEFAULT_TIMESTAMP_COLUMN_NAME; } @@ -94,6 +105,7 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, temporaryDirectory ), + partitionKeyColumnName, timestampColumnName ); } @@ -111,6 +123,13 @@ public String getTimestampColumnName() return timestampColumnName; } + @Nullable + @JsonProperty + public String getPartitionKeyColumnName() + { + return partitionKeyColumnName; + } + @Override public boolean equals(Object o) { @@ -122,12 +141,13 @@ public boolean equals(Object o) } KinesisInputFormat that = (KinesisInputFormat) o; return Objects.equals(valueFormat, that.valueFormat) - && Objects.equals(timestampColumnName, that.timestampColumnName); + && Objects.equals(timestampColumnName, that.timestampColumnName) + && Objects.equals(partitionKeyColumnName, that.partitionKeyColumnName); } @Override public int hashCode() { - return Objects.hash(valueFormat, timestampColumnName); + return Objects.hash(valueFormat, timestampColumnName, partitionKeyColumnName); } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java index 2ea96b7ac153..d0c30280a2b7 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputReader.java @@ -49,18 +49,21 @@ public class KinesisInputReader implements InputEntityReader private final InputRowSchema inputRowSchema; private final SettableByteEntity source; private final InputEntityReader valueParser; + private final String partitionKeyColumnName; private final String timestampColumnName; public KinesisInputReader( InputRowSchema inputRowSchema, SettableByteEntity source, InputEntityReader valueParser, + String partitionKeyColumnName, String timestampColumnName ) { this.inputRowSchema = inputRowSchema; this.source = source; this.valueParser = valueParser; + this.partitionKeyColumnName = partitionKeyColumnName; this.timestampColumnName = timestampColumnName; } @@ -95,6 +98,7 @@ private Map extractHeaders(KinesisRecordEntity record) { final Map mergedHeaderMap = new HashMap<>(); mergedHeaderMap.put(timestampColumnName, record.getRecord().getApproximateArrivalTimestamp().getTime()); + mergedHeaderMap.put(partitionKeyColumnName, record.getRecord().getPartitionKey()); return mergedHeaderMap; } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java index 6f71798769d7..c67915e88d81 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java @@ -65,6 +65,7 @@ public class KinesisInputFormatTest private static final String KINESIS_APPROXIMATE_TIME_DATE = "2024-07-29"; private static final long KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS = DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis(); private static final String DATA_TIMSTAMP_DATE = "2024-07-30"; + private static final String PARTITION_KEY = "partition_key_1"; private static final byte[] SIMPLE_JSON_VALUE_BYTES = StringUtils.toUtf8( TestUtils.singleQuoteToStandardJson( @@ -102,6 +103,7 @@ public void setUp() false, false ), + "kinesis.newts.partitionKey", "kinesis.newts.timestamp" ); } @@ -129,6 +131,7 @@ public void testSerde() throws JsonProcessingException false, false ), + "kinesis.newts.partitionKey", "kinesis.newts.timestamp" ); Assert.assertEquals(format, kif); @@ -175,6 +178,7 @@ public void testTimestampFromHeader() throws IOException String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); @@ -231,6 +235,7 @@ public void testRawSample() throws IOException String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); @@ -289,6 +294,7 @@ public void testProcessesSampleTimestampFromHeader() throws IOException String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); @@ -366,6 +372,7 @@ public void testWithMultipleMixedRecordsTimestampFromHeader() throws IOException String.valueOf(DateTimes.of("2024-07-1" + i).getMillis()), Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); @@ -423,6 +430,7 @@ public void testTimestampFromData() throws IOException String.valueOf(KINESIS_APPROXOIMATE_TIMESTAMP_MILLIS), Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); @@ -499,6 +507,7 @@ public void testWithMultipleMixedRecordsTimestampFromData() throws IOException String.valueOf(DateTimes.of("2024-07-29").getMillis()), Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); @@ -588,7 +597,8 @@ public void testWithSchemaDiscoveryKinesisTimestampExcluded() throws IOException "jq_omg2", "baz", "root_baz2", - "path_omg2" + "path_omg2", + "kinesis.newts.partitionKey" ); Collections.sort(expectedDimensions); Collections.sort(row.getDimensions()); @@ -603,6 +613,7 @@ public void testWithSchemaDiscoveryKinesisTimestampExcluded() throws IOException String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); @@ -656,7 +667,8 @@ public void testWithSchemaDiscoveryTimestampFromHeader() throws IOException "jq_omg2", "baz", "root_baz2", - "path_omg2" + "path_omg2", + "kinesis.newts.partitionKey" ); Collections.sort(expectedDimensions); Collections.sort(row.getDimensions()); @@ -671,6 +683,7 @@ public void testWithSchemaDiscoveryTimestampFromHeader() throws IOException String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); @@ -701,6 +714,7 @@ public void testValueInCsvFormat() throws IOException false, 0 ), + "kinesis.newts.partitionKey", "kinesis.newts.timestamp" ); @@ -715,7 +729,8 @@ public void testValueInCsvFormat() throws IOException ImmutableList.of( "bar", "foo", - "kinesis.newts.timestamp" + "kinesis.newts.timestamp", + "kinesis.newts.partitionKey" ) ) ), @@ -735,7 +750,8 @@ public void testValueInCsvFormat() throws IOException Arrays.asList( "bar", "foo", - "kinesis.newts.timestamp" + "kinesis.newts.timestamp", + "kinesis.newts.partitionKey" ), row.getDimensions() ); @@ -748,6 +764,7 @@ public void testValueInCsvFormat() throws IOException String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertTrue(row.getDimension("bar").isEmpty()); @@ -788,6 +805,7 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException "bar", "foo", "kinesis.newts.timestamp", + "kinesis.newts.partitionKey", "root_baz", "o", "path_omg", @@ -810,6 +828,7 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException String.valueOf(DateTimes.of(KINESIS_APPROXIMATE_TIME_DATE).getMillis()), Iterables.getOnlyElement(row.getDimension("kinesis.newts.timestamp")) ); + Assert.assertEquals(PARTITION_KEY, Iterables.getOnlyElement(row.getDimension("kinesis.newts.partitionKey"))); Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); @@ -835,6 +854,7 @@ private KinesisRecordEntity makeInputEntity( return new KinesisRecordEntity( new Record().withData(ByteBuffer.wrap(payload)) .withApproximateArrivalTimestamp(new Date(kinesisTimestampMillis)) + .withPartitionKey(PARTITION_KEY) ); } From 3abd8ade81c08fa7f81c5635302a31c110981ef5 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 1 Aug 2024 18:59:43 -0400 Subject: [PATCH 16/17] * add check for same timestamp and partitionKey column name --- .../input/kinesis/KinesisInputFormat.java | 4 + .../input/kinesis/KinesisInputFormatTest.java | 73 +++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java index eff2ed6f4aa9..7d97fffed378 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/data/input/kinesis/KinesisInputFormat.java @@ -69,6 +69,10 @@ public KinesisInputFormat( ) { this.valueFormat = Preconditions.checkNotNull(valueFormat, "valueFormat must not be null"); + Preconditions.checkState( + !(timestampColumnName != null && timestampColumnName.equals(partitionKeyColumnName)), + "timestampColumnName and partitionKeyColumnName must be different" + ); this.partitionKeyColumnName = partitionKeyColumnName != null ? partitionKeyColumnName : DEFAULT_PARTITION_KEY_COLUMN_NAME; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java index c67915e88d81..15f012ea6848 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java @@ -28,6 +28,7 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputRowSchema; @@ -847,6 +848,78 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException } } + @Test + @SuppressWarnings("ResultOfMethodCallIgnored") + public void testValidInputFormatConstruction() throws IOException + { + InputFormat valueFormat = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, + false + ); + // null partitionKeyColumnName and null timestampColumnName is valid + new KinesisInputFormat(valueFormat, null, null); + + // non-null partitionKeyColumnName and null timestampColumnName is valid + new KinesisInputFormat(valueFormat, "kinesis.partitionKey", null); + + // null partitionKeyColumnName and non-null timestampColumnName is valid + new KinesisInputFormat(valueFormat, null, "kinesis.timestamp"); + + // non-null partitionKeyColumnName and non-null timestampColumnName is valid + new KinesisInputFormat(valueFormat, "kinesis.partitionKey", "kinesis.timestamp"); + + } + + @Test + @SuppressWarnings("ResultOfMethodCallIgnored") + public void testInvalidInputFormatConstruction() throws IOException + { + // null value format is invalid + Assert.assertThrows( + "valueFormat must not be null", + NullPointerException.class, + () -> new KinesisInputFormat(null, null, null) + ); + + InputFormat valueFormat = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, + false + ); + + // partitionKeyColumnName == timestampColumnName is invalid + Assert.assertThrows( + "timestampColumnName and partitionKeyColumnName must be different", + IllegalStateException.class, + () -> new KinesisInputFormat(valueFormat, "kinesis.timestamp", "kinesis.timestamp") + ); + } + private KinesisRecordEntity makeInputEntity( byte[] payload, long kinesisTimestampMillis) From a5bcf7296129986100f61e9020e0b26171e3843f Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 1 Aug 2024 19:29:52 -0400 Subject: [PATCH 17/17] * fix intellij inspection --- .../druid/data/input/kinesis/KinesisInputFormatTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java index 15f012ea6848..130f31681dee 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/data/input/kinesis/KinesisInputFormatTest.java @@ -850,7 +850,7 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException @Test @SuppressWarnings("ResultOfMethodCallIgnored") - public void testValidInputFormatConstruction() throws IOException + public void testValidInputFormatConstruction() { InputFormat valueFormat = new JsonInputFormat( new JSONPathSpec( @@ -885,7 +885,7 @@ public void testValidInputFormatConstruction() throws IOException @Test @SuppressWarnings("ResultOfMethodCallIgnored") - public void testInvalidInputFormatConstruction() throws IOException + public void testInvalidInputFormatConstruction() { // null value format is invalid Assert.assertThrows(