diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java index 34bc3bc1a378..cf780ad3a9c2 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java @@ -48,7 +48,8 @@ public List getJacksonModules() // (Older versions of Druid didn't specify a type name and got this one by default.) new NamedType(KafkaIndexTaskTuningConfig.class, "KafkaTuningConfig"), new NamedType(KafkaSupervisorTuningConfig.class, "kafka"), - new NamedType(KafkaSupervisorSpec.class, "kafka") + new NamedType(KafkaSupervisorSpec.class, "kafka"), + new NamedType(KafkaSamplerSpec.class, "kafka") ) ); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java new file mode 100644 index 000000000000..63f519bc8dae --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.Firehose; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.overlord.sampler.FirehoseSampler; +import org.apache.druid.indexing.overlord.sampler.SamplerConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +import java.util.HashMap; +import java.util.Map; + +public class KafkaSamplerSpec extends SeekableStreamSamplerSpec +{ + private final ObjectMapper objectMapper; + + @JsonCreator + public KafkaSamplerSpec( + @JsonProperty("spec") final KafkaSupervisorSpec ingestionSpec, + @JsonProperty("samplerConfig") final SamplerConfig samplerConfig, + @JacksonInject FirehoseSampler firehoseSampler, + @JacksonInject ObjectMapper objectMapper + ) + { + super(ingestionSpec, samplerConfig, firehoseSampler); + + this.objectMapper = objectMapper; + } + + @Override + protected Firehose getFirehose(InputRowParser parser) + { + return new KafkaSamplerFirehose(parser); + } + + protected class KafkaSamplerFirehose extends SeekableStreamSamplerFirehose + { + private KafkaSamplerFirehose(InputRowParser parser) + { + super(parser); + } + + @Override + protected RecordSupplier getRecordSupplier() + { + ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + + final Map props = new HashMap<>(((KafkaSupervisorIOConfig) ioConfig).getConsumerProperties()); + + props.put("enable.auto.commit", "false"); + props.put("auto.offset.reset", "none"); + props.put("key.deserializer", ByteArrayDeserializer.class.getName()); + props.put("value.deserializer", ByteArrayDeserializer.class.getName()); + props.put("request.timeout.ms", Integer.toString(samplerConfig.getTimeoutMs())); + + return new KafkaRecordSupplier(props, objectMapper); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxCl); + } + } + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java new file mode 100644 index 000000000000..5820047a8d20 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.curator.test.TestingCluster; +import org.apache.druid.client.cache.MapCache; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.FloatDimensionSchema; +import org.apache.druid.data.input.impl.JSONParseSpec; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.StringInputRowParser; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; +import org.apache.druid.indexing.kafka.test.TestBroker; +import org.apache.druid.indexing.overlord.sampler.FirehoseSampler; +import org.apache.druid.indexing.overlord.sampler.SamplerCache; +import org.apache.druid.indexing.overlord.sampler.SamplerConfig; +import org.apache.druid.indexing.overlord.sampler.SamplerResponse; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class KafkaSamplerSpecTest +{ + private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); + private static final String TOPIC = "sampling"; + private static final DataSchema DATA_SCHEMA = new DataSchema( + "test_ds", + objectMapper.convertValue( + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim1t"), + new StringDimensionSchema("dim2"), + new LongDimensionSchema("dimLong"), + new FloatDimensionSchema("dimFloat") + ), + null, + null + ), + new JSONPathSpec(true, ImmutableList.of()), + ImmutableMap.of() + ), + StandardCharsets.UTF_8.name() + ), + Map.class + ), + new AggregatorFactory[]{ + new DoubleSumAggregatorFactory("met1sum", "met1"), + new CountAggregatorFactory("rows") + }, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + null, + objectMapper + ); + + private static TestingCluster zkServer; + private static TestBroker kafkaServer; + + private static List> generateRecords(String topic) + { + return ImmutableList.of( + new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")), + new ProducerRecord<>(topic, 0, null, null) + ); + } + + @BeforeClass + public static void setupClass() throws Exception + { + zkServer = new TestingCluster(1); + zkServer.start(); + + kafkaServer = new TestBroker(zkServer.getConnectString(), null, 1, ImmutableMap.of("num.partitions", "2")); + kafkaServer.start(); + } + + @AfterClass + public static void tearDownClass() throws Exception + { + kafkaServer.close(); + zkServer.stop(); + } + + @Test(timeout = 30_000L) + public void testSample() + { + insertData(generateRecords(TOPIC)); + + KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec( + DATA_SCHEMA, + null, + new KafkaSupervisorIOConfig( + TOPIC, + null, + null, + null, + kafkaServer.consumerProperties(), + null, + null, + null, + true, + null, + null, + null + ), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( + supervisorSpec, + new SamplerConfig(5, null, null, null), + new FirehoseSampler(objectMapper, new SamplerCache(MapCache.create(100000))), + objectMapper + ); + + SamplerResponse response = samplerSpec.sample(); + + Assert.assertNotNull(response.getCacheKey()); + Assert.assertEquals(5, (int) response.getNumRowsRead()); + Assert.assertEquals(3, (int) response.getNumRowsIndexed()); + Assert.assertEquals(5, response.getData().size()); + + Iterator it = response.getData().iterator(); + + Assert.assertEquals(new SamplerResponse.SamplerResponseRow( + "{\"timestamp\":\"2008\",\"dim1\":\"a\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}", + ImmutableMap.builder() + .put("__time", 1199145600000L) + .put("dim1", "a") + .put("dim2", "y") + .put("dimLong", 10L) + .put("dimFloat", 20.0F) + .put("rows", 1L) + .put("met1sum", 1.0) + .build(), + null, + null + ), it.next()); + Assert.assertEquals(new SamplerResponse.SamplerResponseRow( + "{\"timestamp\":\"2009\",\"dim1\":\"b\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}", + ImmutableMap.builder() + .put("__time", 1230768000000L) + .put("dim1", "b") + .put("dim2", "y") + .put("dimLong", 10L) + .put("dimFloat", 20.0F) + .put("rows", 1L) + .put("met1sum", 1.0) + .build(), + null, + null + ), it.next()); + Assert.assertEquals(new SamplerResponse.SamplerResponseRow( + "{\"timestamp\":\"2010\",\"dim1\":\"c\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}", + ImmutableMap.builder() + .put("__time", 1262304000000L) + .put("dim1", "c") + .put("dim2", "y") + .put("dimLong", 10L) + .put("dimFloat", 20.0F) + .put("rows", 1L) + .put("met1sum", 1.0) + .build(), + null, + null + ), it.next()); + Assert.assertEquals(new SamplerResponse.SamplerResponseRow( + "{\"timestamp\":\"246140482-04-24T15:36:27.903Z\",\"dim1\":\"x\",\"dim2\":\"z\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}", + null, + true, + "Timestamp cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]" + ), it.next()); + Assert.assertEquals(new SamplerResponse.SamplerResponseRow( + "unparseable", + null, + true, + "Unable to parse row [unparseable]" + ), it.next()); + + Assert.assertFalse(it.hasNext()); + } + + private static void insertData(List> data) + { + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + kafkaProducer.initTransactions(); + kafkaProducer.beginTransaction(); + + data.forEach(kafkaProducer::send); + + kafkaProducer.commitTransaction(); + } + } + + private static byte[] jb(String timestamp, String dim1, String dim2, String dimLong, String dimFloat, String met1) + { + try { + return new ObjectMapper().writeValueAsBytes( + ImmutableMap.builder() + .put("timestamp", timestamp) + .put("dim1", dim1) + .put("dim2", dim2) + .put("dimLong", dimLong) + .put("dimFloat", dimFloat) + .put("met1", met1) + .build() + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java index cba5166f268d..3cea45a14acd 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java @@ -48,7 +48,8 @@ public List getJacksonModules() new NamedType(KinesisDataSourceMetadata.class, "kinesis"), new NamedType(KinesisIndexTaskIOConfig.class, "kinesis"), new NamedType(KinesisSupervisorTuningConfig.class, "kinesis"), - new NamedType(KinesisSupervisorSpec.class, "kinesis") + new NamedType(KinesisSupervisorSpec.class, "kinesis"), + new NamedType(KinesisSamplerSpec.class, "kinesis") ) ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java new file mode 100644 index 000000000000..1fd36f279ad7 --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kinesis; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.inject.name.Named; +import org.apache.druid.common.aws.AWSCredentialsConfig; +import org.apache.druid.data.input.Firehose; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig; +import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec; +import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig; +import org.apache.druid.indexing.overlord.sampler.FirehoseSampler; +import org.apache.druid.indexing.overlord.sampler.SamplerConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; + +public class KinesisSamplerSpec extends SeekableStreamSamplerSpec +{ + private final AWSCredentialsConfig awsCredentialsConfig; + + @JsonCreator + public KinesisSamplerSpec( + @JsonProperty("spec") final KinesisSupervisorSpec ingestionSpec, + @JsonProperty("samplerConfig") final SamplerConfig samplerConfig, + @JacksonInject FirehoseSampler firehoseSampler, + @JacksonInject @Named("kinesis") AWSCredentialsConfig awsCredentialsConfig + ) + { + super(ingestionSpec, samplerConfig, firehoseSampler); + + this.awsCredentialsConfig = awsCredentialsConfig; + } + + @Override + protected Firehose getFirehose(InputRowParser parser) + { + return new KinesisSamplerFirehose(parser); + } + + protected class KinesisSamplerFirehose extends SeekableStreamSamplerFirehose + { + protected KinesisSamplerFirehose(InputRowParser parser) + { + super(parser); + } + + @Override + protected RecordSupplier getRecordSupplier() + { + KinesisSupervisorIOConfig ioConfig = (KinesisSupervisorIOConfig) KinesisSamplerSpec.this.ioConfig; + KinesisSupervisorTuningConfig tuningConfig = ((KinesisSupervisorTuningConfig) KinesisSamplerSpec.this.tuningConfig); + + return new KinesisRecordSupplier( + KinesisRecordSupplier.getAmazonKinesisClient( + ioConfig.getEndpoint(), + awsCredentialsConfig, + ioConfig.getAwsAssumedRoleArn(), + ioConfig.getAwsExternalId() + ), + ioConfig.getRecordsPerFetch(), + ioConfig.getFetchDelayMillis(), + 1, + ioConfig.isDeaggregate(), + tuningConfig.getRecordBufferSize(), + tuningConfig.getRecordBufferOfferTimeout(), + tuningConfig.getRecordBufferFullWait(), + tuningConfig.getFetchSequenceNumberTimeout(), + tuningConfig.getMaxRecordsPerPoll() + ); + } + } +} diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java new file mode 100644 index 000000000000..62b1226466fa --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kinesis; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.client.cache.MapCache; +import org.apache.druid.common.aws.AWSCredentialsConfig; +import org.apache.druid.data.input.Firehose; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.FloatDimensionSchema; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.JSONParseSpec; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.StringInputRowParser; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig; +import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec; +import org.apache.druid.indexing.overlord.sampler.FirehoseSampler; +import org.apache.druid.indexing.overlord.sampler.SamplerCache; +import org.apache.druid.indexing.overlord.sampler.SamplerConfig; +import org.apache.druid.indexing.overlord.sampler.SamplerResponse; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.easymock.EasyMockSupport; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.easymock.EasyMock.anyLong; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; + +public class KinesisSamplerSpecTest extends EasyMockSupport +{ + private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); + private static final String STREAM = "sampling"; + private static final String SHARD_ID = "1"; + private static final DataSchema DATA_SCHEMA = new DataSchema( + "test_ds", + objectMapper.convertValue( + new StringInputRowParser( + new JSONParseSpec( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim1t"), + new StringDimensionSchema("dim2"), + new LongDimensionSchema("dimLong"), + new FloatDimensionSchema("dimFloat") + ), + null, + null + ), + new JSONPathSpec(true, ImmutableList.of()), + ImmutableMap.of() + ), + StandardCharsets.UTF_8.name() + ), + Map.class + ), + new AggregatorFactory[]{ + new DoubleSumAggregatorFactory("met1sum", "met1"), + new CountAggregatorFactory("rows") + }, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + null, + objectMapper + ); + + private final KinesisRecordSupplier recordSupplier = mock(KinesisRecordSupplier.class); + + private static List> generateRecords(String stream) + { + return ImmutableList.of( + new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")), + new OrderedPartitionableRecord<>( + stream, + "1", + "5", + jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0") + ), + new OrderedPartitionableRecord<>( + stream, + "1", + "6", + Collections.singletonList(StringUtils.toUtf8("unparseable")) + ), + new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))) + ); + } + + @Test(timeout = 10_000L) + public void testSample() throws Exception + { + expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).once(); + + recordSupplier.assign(ImmutableSet.of(StreamPartition.of(STREAM, SHARD_ID))); + expectLastCall().once(); + + recordSupplier.seekToEarliest(ImmutableSet.of(StreamPartition.of(STREAM, SHARD_ID))); + expectLastCall().once(); + + expect(recordSupplier.poll(anyLong())).andReturn(generateRecords(STREAM)).once(); + + recordSupplier.close(); + expectLastCall().once(); + + replayAll(); + + KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec( + DATA_SCHEMA, + null, + new KinesisSupervisorIOConfig( + STREAM, + null, + null, + null, + null, + null, + null, + null, + true, + null, + null, + null, + null, + null, + null, + null, + false + ), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + KinesisSamplerSpec samplerSpec = new TestableKinesisSamplerSpec( + supervisorSpec, + new SamplerConfig(5, null, null, null), + new FirehoseSampler(objectMapper, new SamplerCache(MapCache.create(100000))), + null + ); + + SamplerResponse response = samplerSpec.sample(); + + verifyAll(); + + Assert.assertNotNull(response.getCacheKey()); + Assert.assertEquals(5, (int) response.getNumRowsRead()); + Assert.assertEquals(3, (int) response.getNumRowsIndexed()); + Assert.assertEquals(5, response.getData().size()); + + Iterator it = response.getData().iterator(); + + Assert.assertEquals(new SamplerResponse.SamplerResponseRow( + "{\"timestamp\":\"2008\",\"dim1\":\"a\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}", + ImmutableMap.builder() + .put("__time", 1199145600000L) + .put("dim1", "a") + .put("dim2", "y") + .put("dimLong", 10L) + .put("dimFloat", 20.0F) + .put("rows", 1L) + .put("met1sum", 1.0) + .build(), + null, + null + ), it.next()); + Assert.assertEquals(new SamplerResponse.SamplerResponseRow( + "{\"timestamp\":\"2009\",\"dim1\":\"b\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}", + ImmutableMap.builder() + .put("__time", 1230768000000L) + .put("dim1", "b") + .put("dim2", "y") + .put("dimLong", 10L) + .put("dimFloat", 20.0F) + .put("rows", 1L) + .put("met1sum", 1.0) + .build(), + null, + null + ), it.next()); + Assert.assertEquals(new SamplerResponse.SamplerResponseRow( + "{\"timestamp\":\"2010\",\"dim1\":\"c\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}", + ImmutableMap.builder() + .put("__time", 1262304000000L) + .put("dim1", "c") + .put("dim2", "y") + .put("dimLong", 10L) + .put("dimFloat", 20.0F) + .put("rows", 1L) + .put("met1sum", 1.0) + .build(), + null, + null + ), it.next()); + Assert.assertEquals(new SamplerResponse.SamplerResponseRow( + "{\"timestamp\":\"246140482-04-24T15:36:27.903Z\",\"dim1\":\"x\",\"dim2\":\"z\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}", + null, + true, + "Timestamp cannot be represented as a long: [MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]" + ), it.next()); + Assert.assertEquals(new SamplerResponse.SamplerResponseRow( + "unparseable", + null, + true, + "Unable to parse row [unparseable]" + ), it.next()); + + Assert.assertFalse(it.hasNext()); + } + + private static List jb(String ts, String dim1, String dim2, String dimLong, String dimFloat, String met1) + { + try { + return Collections.singletonList(new ObjectMapper().writeValueAsBytes( + ImmutableMap.builder() + .put("timestamp", ts) + .put("dim1", dim1) + .put("dim2", dim2) + .put("dimLong", dimLong) + .put("dimFloat", dimFloat) + .put("met1", met1) + .build() + )); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private class TestableKinesisSamplerSpec extends KinesisSamplerSpec + { + private TestableKinesisSamplerSpec( + KinesisSupervisorSpec ingestionSpec, + SamplerConfig samplerConfig, + FirehoseSampler firehoseSampler, + AWSCredentialsConfig awsCredentialsConfig + ) + { + super(ingestionSpec, samplerConfig, firehoseSampler, awsCredentialsConfig); + } + + @Override + protected Firehose getFirehose(InputRowParser parser) + { + return new KinesisSamplerFirehose(parser) + { + @Override + protected RecordSupplier getRecordSupplier() + { + return recordSupplier; + } + }; + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java new file mode 100644 index 000000000000..c7303bc55cfa --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.Firehose; +import org.apache.druid.data.input.FirehoseFactory; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowPlusRaw; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.StringInputRowParser; +import org.apache.druid.indexing.overlord.sampler.FirehoseSampler; +import org.apache.druid.indexing.overlord.sampler.SamplerConfig; +import org.apache.druid.indexing.overlord.sampler.SamplerException; +import org.apache.druid.indexing.overlord.sampler.SamplerResponse; +import org.apache.druid.indexing.overlord.sampler.SamplerSpec; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.utils.Runnables; + +import javax.annotation.Nullable; +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public abstract class SeekableStreamSamplerSpec implements SamplerSpec +{ + private static final int POLL_TIMEOUT_MS = 100; + + private final DataSchema dataSchema; + private final FirehoseSampler firehoseSampler; + + protected final SeekableStreamSupervisorIOConfig ioConfig; + protected final SeekableStreamSupervisorTuningConfig tuningConfig; + protected final SamplerConfig samplerConfig; + + public SeekableStreamSamplerSpec( + final SeekableStreamSupervisorSpec ingestionSpec, + final SamplerConfig samplerConfig, + final FirehoseSampler firehoseSampler + ) + { + this.dataSchema = Preconditions.checkNotNull(ingestionSpec, "[spec] is required").getDataSchema(); + this.ioConfig = Preconditions.checkNotNull(ingestionSpec.getIoConfig(), "[spec.ioConfig] is required"); + this.tuningConfig = ingestionSpec.getTuningConfig(); + this.samplerConfig = samplerConfig; + this.firehoseSampler = firehoseSampler; + } + + @Override + public SamplerResponse sample() + { + return firehoseSampler.sample( + new FirehoseFactory() + { + @Override + public Firehose connect(InputRowParser parser, @Nullable File temporaryDirectory) + { + return getFirehose(parser); + } + }, + dataSchema, + samplerConfig + ); + } + + protected abstract Firehose getFirehose(InputRowParser parser); + + protected abstract class SeekableStreamSamplerFirehose implements Firehose + { + private final InputRowParser parser; + private final RecordSupplier recordSupplier; + + private Iterator> recordIterator; + private Iterator recordDataIterator; + + private volatile boolean closed = false; + + protected SeekableStreamSamplerFirehose(InputRowParser parser) + { + this.parser = parser; + + if (parser instanceof StringInputRowParser) { + ((StringInputRowParser) parser).startFileFromBeginning(); + } + + this.recordSupplier = getRecordSupplier(); + + try { + assignAndSeek(); + } + catch (InterruptedException e) { + throw new SamplerException(e, "Exception while seeking to partitions"); + } + } + + @Override + public boolean hasMore() + { + return !closed; + } + + @Nullable + @Override + public InputRow nextRow() + { + InputRowPlusRaw row = nextRowWithRaw(); + if (row.getParseException() != null) { + throw row.getParseException(); + } + + return row.getInputRow(); + } + + @Override + public InputRowPlusRaw nextRowWithRaw() + { + if (recordDataIterator == null || !recordDataIterator.hasNext()) { + if (recordIterator == null || !recordIterator.hasNext()) { + recordIterator = recordSupplier.poll(POLL_TIMEOUT_MS).iterator(); + + if (!recordIterator.hasNext()) { + return InputRowPlusRaw.of((InputRow) null, null); + } + } + + recordDataIterator = recordIterator.next().getData().iterator(); + + if (!recordDataIterator.hasNext()) { + return InputRowPlusRaw.of((InputRow) null, null); + } + } + + byte[] raw = recordDataIterator.next(); + + try { + List rows = parser.parseBatch(ByteBuffer.wrap(raw)); + return InputRowPlusRaw.of(rows.isEmpty() ? null : rows.get(0), raw); + } + catch (ParseException e) { + return InputRowPlusRaw.of(raw, e); + } + } + + @Override + public Runnable commit() + { + return Runnables.getNoopRunnable(); + } + + @Override + public void close() + { + if (closed) { + return; + } + + closed = true; + recordSupplier.close(); + } + + private void assignAndSeek() throws InterruptedException + { + final Set> partitions = recordSupplier + .getPartitionIds(ioConfig.getStream()) + .stream() + .map(x -> StreamPartition.of(ioConfig.getStream(), x)) + .collect(Collectors.toSet()); + + recordSupplier.assign(partitions); + + if (ioConfig.isUseEarliestSequenceNumber()) { + recordSupplier.seekToEarliest(partitions); + } else { + recordSupplier.seekToLatest(partitions); + } + } + + protected abstract RecordSupplier getRecordSupplier(); + } +}