From 9988d8f0d14bd2fc0fdc374ca416853af7508c16 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 28 Oct 2024 11:41:39 -0500 Subject: [PATCH 01/12] kafka-indexing: Report consumer io time --- .../indexing/kafka/KafkaConsumerMonitor.java | 9 ++++- .../druid/indexing/kafka/KafkaIndexTask.java | 3 +- .../indexing/kafka/KafkaRecordSupplier.java | 11 ++++-- .../indexing/kafka/KafkaSamplerSpec.java | 2 +- .../kafka/supervisor/KafkaSupervisor.java | 3 +- .../kafka/KafkaRecordSupplierTest.java | 38 +++++++++++-------- .../kafka/supervisor/KafkaSupervisorTest.java | 2 +- 7 files changed, 43 insertions(+), 25 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java index 6a8c6c149051..4ed2493da550 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java @@ -43,16 +43,20 @@ public class KafkaConsumerMonitor extends AbstractMonitor ImmutableMap.builder() .put("bytes-consumed-total", "kafka/consumer/bytesConsumed") .put("records-consumed-total", "kafka/consumer/recordsConsumed") + .put("io-wait-time-ns-total", "kafka/consumer/io/time") .build(); private static final String TOPIC_TAG = "topic"; + private static final String DATASOURCE_TAG = "datasource"; private static final Set TOPIC_METRIC_TAGS = ImmutableSet.of("client-id", TOPIC_TAG); private final KafkaConsumer consumer; + private final String datasource; private final Map counters = new HashMap<>(); - public KafkaConsumerMonitor(final KafkaConsumer consumer) + public KafkaConsumerMonitor(final KafkaConsumer consumer, String datasource) { this.consumer = consumer; + this.datasource = datasource; } @Override @@ -71,6 +75,9 @@ public boolean doMonitor(final ServiceEmitter emitter) if (newValue != priorValue) { final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic); + if (datasource != null) { + builder.setDimension(DATASOURCE_TAG, datasource); + } emitter.emit(builder.setMetric(METRICS.get(metricName.name()), newValue - priorValue)); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index e5ff77467cdb..2834f2d31ce5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -111,7 +111,8 @@ protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) final KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides(), - kafkaIndexTaskIOConfig.isMultiTopic()); + kafkaIndexTaskIOConfig.isMultiTopic(), dataSchema.getDataSource() + ); if (toolbox.getMonitorScheduler() != null) { toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor()); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index a88300552e2a..5b8ea4f410e0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.serialization.Deserializer; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Type; @@ -78,21 +79,23 @@ public KafkaRecordSupplier( Map consumerProperties, ObjectMapper sortingMapper, KafkaConfigOverrides configOverrides, - boolean multiTopic + boolean multiTopic, + @Nullable String datasource ) { - this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic); + this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic, datasource); } @VisibleForTesting public KafkaRecordSupplier( KafkaConsumer consumer, - boolean multiTopic + boolean multiTopic, + @Nullable String datasource ) { this.consumer = consumer; this.multiTopic = multiTopic; - this.monitor = new KafkaConsumerMonitor(consumer); + this.monitor = new KafkaConsumerMonitor(consumer, datasource); } @Override diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java index e0683fb605bb..e941bff3885f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java @@ -74,7 +74,7 @@ protected KafkaRecordSupplier createRecordSupplier() KafkaSupervisorIOConfig kafkaSupervisorIOConfig = (KafkaSupervisorIOConfig) ioConfig; return new KafkaRecordSupplier(props, objectMapper, kafkaSupervisorIOConfig.getConfigOverrides(), - kafkaSupervisorIOConfig.isMultiTopic() + kafkaSupervisorIOConfig.isMultiTopic(), null ); } finally { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index aebacecff662..9e1e09850db0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -137,7 +137,8 @@ protected RecordSupplier setupReco spec.getIoConfig().getConsumerProperties(), sortingMapper, spec.getIoConfig().getConfigOverrides(), - spec.getIoConfig().isMultiTopic() + spec.getIoConfig().isMultiTopic(), + spec.getId() ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index bfd81464ba2f..a12357f9dfc5 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -73,6 +73,7 @@ public class KafkaRecordSupplierTest private static KafkaTopicPartition PARTITION_1 = new KafkaTopicPartition(false, null, 1); private static String TOPIC = "topic"; + private static String STREAM = "stream"; private static int TOPIC_POS_FIX = 0; private static TestingCluster ZK_SERVER; private static TestBroker KAFKA_SERVER; @@ -239,7 +240,7 @@ public void testSupplierSetup() throws ExecutionException, InterruptedException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -264,7 +265,7 @@ public void testMultiTopicSupplierSetup() throws ExecutionException, Interrupted insertData(); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true, STREAM); String stream = Pattern.quote(TOPIC) + "|" + Pattern.quote(otherTopic); Set partitions = recordSupplier.getPartitionIds(stream); @@ -300,7 +301,8 @@ public void testSupplierSetupCustomDeserializer() throws ExecutionException, Int properties, OBJECT_MAPPER, null, - false + false, + STREAM ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -328,7 +330,8 @@ public void testSupplierSetupCustomDeserializerRequiresParameter() properties, OBJECT_MAPPER, null, - false + false, + STREAM ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated @@ -347,7 +350,8 @@ public void testSupplierSetupCustomDeserializerRequiresParameterButMissingIt() properties, OBJECT_MAPPER, null, - false + false, + STREAM ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated @@ -374,7 +378,8 @@ public void testPollCustomDeserializer() throws InterruptedException, ExecutionE properties, OBJECT_MAPPER, null, - false + false, + STREAM ); recordSupplier.assign(partitions); @@ -412,7 +417,8 @@ public void testPoll() throws InterruptedException, ExecutionException KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, - false + false, + STREAM ); final Monitor monitor = recordSupplier.monitor(); @@ -463,7 +469,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -533,7 +539,7 @@ public void testSeek() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -576,7 +582,7 @@ public void testSeekToLatest() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -609,7 +615,7 @@ public void testSeekUnassigned() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); recordSupplier.assign(partitions); @@ -635,7 +641,7 @@ public void testPosition() throws ExecutionException, InterruptedException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -670,7 +676,7 @@ public void testPosition() throws ExecutionException, InterruptedException public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -682,7 +688,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShoul public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -694,7 +700,7 @@ public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetSho public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -706,7 +712,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldR public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 7926a0568fd9..968378894964 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -5777,7 +5777,7 @@ protected RecordSupplier setupReco Deserializer valueDeserializerObject = new ByteArrayDeserializer(); return new KafkaRecordSupplier( new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject), - getIoConfig().isMultiTopic() + getIoConfig().isMultiTopic(), null ); } From 1377975cfe639dd602771f696d827e252f6bb96c Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Sun, 3 Nov 2024 11:18:01 -0600 Subject: [PATCH 02/12] commit --- .../RabbitStreamIndexTaskIOConfig.java | 9 +- .../supervisor/RabbitStreamSupervisor.java | 5 +- .../kafka/KafkaIndexTaskIOConfig.java | 13 +- .../kafka/supervisor/KafkaSupervisor.java | 3 +- .../indexing/kafka/KafkaIndexTaskTest.java | 160 ++++++++---- .../kafka/supervisor/KafkaSupervisorTest.java | 7 +- .../kinesis/KinesisIndexTaskIOConfig.java | 13 +- .../kinesis/supervisor/KinesisSupervisor.java | 3 +- .../indexing/kinesis/KinesisIOConfigTest.java | 4 +- .../kinesis/KinesisIndexTaskSerdeTest.java | 4 +- .../kinesis/KinesisIndexTaskTest.java | 16 +- .../supervisor/KinesisSupervisorTest.java | 4 +- .../SeekableStreamIndexTask.java | 27 -- .../SeekableStreamIndexTaskIOConfig.java | 11 +- .../SeekableStreamIndexTaskRunner.java | 60 ++++- ...SeekableStreamIndexTaskRunnerAuthTest.java | 4 +- .../SeekableStreamIndexTaskRunnerTest.java | 241 ++++++++++++++++++ .../SeekableStreamSupervisorSpecTest.java | 3 +- .../SeekableStreamSupervisorStateTest.java | 6 +- 19 files changed, 483 insertions(+), 110 deletions(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java index 4c053d89455d..8739fbed9b55 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.joda.time.DateTime; +import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.Map; @@ -53,7 +54,9 @@ public RabbitStreamIndexTaskIOConfig( @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, - @JsonProperty("uri") String uri) + @JsonProperty("uri") String uri, + @JsonProperty("taskDuration") Duration taskDuration + ) { super( taskGroupId, @@ -63,7 +66,9 @@ public RabbitStreamIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - inputFormat); + inputFormat, + taskDuration + ); this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS; this.uri = uri; diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java index 222022e3e66f..d0c1045141ca 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java @@ -59,7 +59,6 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -202,7 +201,9 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat(), - rabbitConfig.getUri()); + rabbitConfig.getUri(), + ioConfig.getTaskDuration() + ); } @Override diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java index 82c9ad71c973..3d21d2d8aa46 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides; import org.joda.time.DateTime; +import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.Map; @@ -63,7 +64,8 @@ public KafkaIndexTaskIOConfig( @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, @JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides, - @JsonProperty("multiTopic") @Nullable Boolean multiTopic + @JsonProperty("multiTopic") @Nullable Boolean multiTopic, + @JsonProperty("taskDuration") Duration taskDuration ) { super( @@ -76,7 +78,8 @@ public KafkaIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - inputFormat + inputFormat, + taskDuration ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); @@ -107,7 +110,8 @@ public KafkaIndexTaskIOConfig( DateTime minimumMessageTime, DateTime maximumMessageTime, InputFormat inputFormat, - KafkaConfigOverrides configOverrides + KafkaConfigOverrides configOverrides, + Duration taskDuration ) { this( @@ -124,7 +128,8 @@ public KafkaIndexTaskIOConfig( maximumMessageTime, inputFormat, configOverrides, - KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC + KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC, + taskDuration ); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 9e1e09850db0..cff3e508aa0c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -219,7 +219,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( maximumMessageTime, ioConfig.getInputFormat(), kafkaIoConfig.getConfigOverrides(), - kafkaIoConfig.isMultiTopic() + kafkaIoConfig.isMultiTopic(), + ioConfig.getTaskDuration() ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 23bdeb14acb8..854d9234ced9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -120,6 +120,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; @@ -357,7 +358,8 @@ public void testRunAfterDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); Assert.assertTrue(task.supportsQueries()); @@ -413,7 +415,8 @@ public void testIngestNullColumnAfterDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -461,7 +464,8 @@ public void testIngestNullColumnAfterDataInserted_storeEmptyColumnsOff_shouldNot null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false); @@ -496,7 +500,8 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception null, null, null, - null + null, + Duration.standardHours(2) ) ); @@ -537,7 +542,8 @@ public void testRunBeforeDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -588,7 +594,8 @@ public void testRunAfterDataInsertedLiveReport() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -665,7 +672,8 @@ public void testIncrementalHandOff() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -768,7 +776,8 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -894,7 +903,8 @@ public void testTimeBasedIncrementalHandOff() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -972,7 +982,8 @@ public void testCheckpointResetWithSameEndOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -1035,7 +1046,8 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final KafkaIndexTask staleReplica = createTask( @@ -1051,7 +1063,8 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1117,7 +1130,8 @@ public void testRunWithMinimumMessageTime() throws Exception DateTimes.of("2010"), null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1165,7 +1179,8 @@ public void testRunWithMaximumMessageTime() throws Exception null, DateTimes.of("2010"), INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1222,7 +1237,8 @@ public void testRunWithTransformSpec() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1293,7 +1309,8 @@ public void testKafkaRecordEntityInputFormat() throws Exception null, null, new TestKafkaInputFormat(INPUT_FORMAT), - null + null, + Duration.standardHours(2) ) ); Assert.assertTrue(task.supportsQueries()); @@ -1365,7 +1382,8 @@ public void testKafkaInputFormat() throws Exception null, null, KAFKA_INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); Assert.assertTrue(task.supportsQueries()); @@ -1416,7 +1434,8 @@ public void testRunOnNothing() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1451,7 +1470,8 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1497,7 +1517,8 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1548,7 +1569,8 @@ public void testReportParseExceptions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1586,7 +1608,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1678,7 +1701,8 @@ public void testMultipleParseExceptionsFailure() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1744,7 +1768,8 @@ public void testRunReplicas() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final KafkaIndexTask task2 = createTask( @@ -1760,7 +1785,8 @@ public void testRunReplicas() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1808,7 +1834,8 @@ public void testRunConflicting() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final KafkaIndexTask task2 = createTask( @@ -1824,7 +1851,8 @@ public void testRunConflicting() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1874,7 +1902,8 @@ public void testRunConflictingWithoutTransactions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final KafkaIndexTask task2 = createTask( @@ -1890,7 +1919,8 @@ public void testRunConflictingWithoutTransactions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1938,7 +1968,8 @@ public void testRunOneTaskTwoPartitions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -1984,7 +2015,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final KafkaIndexTask task2 = createTask( @@ -2000,7 +2032,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2050,7 +2083,8 @@ public void testRestore() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2084,7 +2118,8 @@ public void testRestore() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2134,7 +2169,8 @@ public void testRestoreAfterPersistingSequences() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2177,7 +2213,8 @@ public void testRestoreAfterPersistingSequences() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2228,7 +2265,8 @@ public void testRunWithPauseAndResume() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2299,7 +2337,8 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2336,7 +2375,8 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2383,7 +2423,8 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ), context ); @@ -2427,7 +2468,8 @@ public void testRunWithDuplicateRequest() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2466,7 +2508,8 @@ public void testRunTransactionModeRollback() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2547,7 +2590,8 @@ public void testRunUnTransactionMode() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2607,7 +2651,8 @@ public void testCanStartFromLaterThanEarliestOffset() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); final ListenableFuture future = runTask(task); @@ -2630,7 +2675,8 @@ public void testRunWithoutDataInserted() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2676,7 +2722,8 @@ public void testSerde() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2708,7 +2755,8 @@ public void testCorrectInputSources() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -2970,7 +3018,8 @@ public void testMultipleLinesJSONText() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -3031,7 +3080,8 @@ public void testParseExceptionsInIteratorConstructionSuccess() throws Exception null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2) ) ); @@ -3103,7 +3153,8 @@ public void testNoParseExceptionsTaskSucceeds() throws Exception null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2) ) ); @@ -3177,7 +3228,8 @@ public void testParseExceptionsBeyondThresholdTaskFails() throws Exception null, null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), - null + null, + Duration.standardHours(2) ) ); @@ -3229,7 +3281,8 @@ public void testCompletionReportPartitionStats() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); @@ -3283,7 +3336,8 @@ public void testCompletionReportMultiplePartitionStats() throws Exception null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ) ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 968378894964..e04b1f030f8b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -107,6 +107,7 @@ import org.easymock.EasyMockSupport; import org.easymock.IAnswer; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; @@ -496,7 +497,8 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException null, null, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ), new KafkaIndexTaskTuningConfig( null, @@ -5641,7 +5643,8 @@ private KafkaIndexTask createKafkaIndexTask( minimumMessageTime, maximumMessageTime, INPUT_FORMAT, - null + null, + Duration.standardHours(2) ), Collections.emptyMap(), OBJECT_MAPPER diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index 881d68ba8968..065bf9636ce8 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.joda.time.DateTime; +import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.Set; @@ -78,7 +79,8 @@ public KinesisIndexTaskIOConfig( @JsonProperty("endpoint") String endpoint, @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, - @JsonProperty("awsExternalId") String awsExternalId + @JsonProperty("awsExternalId") String awsExternalId, + @JsonProperty("taskDuration") Duration taskDuration ) { super( @@ -89,7 +91,8 @@ public KinesisIndexTaskIOConfig( useTransaction, minimumMessageTime, maximumMessageTime, - inputFormat + inputFormat, + taskDuration ); Preconditions.checkArgument( getEndSequenceNumbers().getPartitionSequenceNumberMap() @@ -117,7 +120,8 @@ public KinesisIndexTaskIOConfig( String endpoint, Integer fetchDelayMillis, String awsAssumedRoleArn, - String awsExternalId + String awsExternalId, + Duration taskDuration ) { this( @@ -135,7 +139,8 @@ public KinesisIndexTaskIOConfig( endpoint, fetchDelayMillis, awsAssumedRoleArn, - awsExternalId + awsExternalId, + taskDuration ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 2f00c8c16cc9..16ff751a4891 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -145,7 +145,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( ioConfig.getEndpoint(), ioConfig.getFetchDelayMillis(), ioConfig.getAwsAssumedRoleArn(), - ioConfig.getAwsExternalId() + ioConfig.getAwsExternalId(), + ioConfig.getTaskDuration() ); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java index 3162b2ea0eee..a917d9a43a9c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.indexing.IOConfig; import org.hamcrest.CoreMatchers; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -267,7 +268,8 @@ public void testDeserializeToOldIoConfig() throws IOException "endpoint", 2000, "awsAssumedRoleArn", - "awsExternalId" + "awsExternalId", + Duration.standardHours(2) ); final byte[] json = mapper.writeValueAsBytes(currentConfig); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index e84581af6013..5648255deddf 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -40,6 +40,7 @@ import org.apache.druid.server.security.Resource; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -94,7 +95,8 @@ public class KinesisIndexTaskSerdeTest "endpoint", null, null, - null + null, + Duration.standardHours(2) ); private static final String ACCESS_KEY = "test-access-key"; private static final String SECRET_KEY = "test-secret-key"; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 2ef391484008..757f6735a700 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -89,6 +89,7 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.timeline.DataSegment; import org.easymock.EasyMock; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.AfterClass; @@ -785,7 +786,8 @@ public void testRunWithMinimumMessageTime() throws Exception "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ) ); @@ -847,7 +849,8 @@ public void testRunWithMaximumMessageTime() throws Exception "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ) ); @@ -1946,7 +1949,8 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ), context ); @@ -2108,7 +2112,8 @@ public void testSequencesFromContext() throws IOException "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ), context ); @@ -2309,7 +2314,8 @@ private KinesisIndexTask createTask( "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ), null ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 122a8e1c5ae8..aec9cae152fd 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -94,6 +94,7 @@ import org.easymock.EasyMockSupport; import org.easymock.IAnswer; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -5563,7 +5564,8 @@ private KinesisIndexTask createKinesisIndexTask( "awsEndpoint", null, null, - null + null, + Duration.standardHours(2) ), Collections.emptyMap(), false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 41cd084cd960..d43b83d78d07 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; -import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever; @@ -244,32 +243,6 @@ public StreamAppenderatorDriver newDriver( ); } - public boolean withinMinMaxRecordTime(final InputRow row) - { - final boolean beforeMinimumMessageTime = ioConfig.getMinimumMessageTime().isPresent() - && ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp()); - - final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent() - && ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp()); - - if (log.isDebugEnabled()) { - if (beforeMinimumMessageTime) { - log.debug( - "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", - row.getTimestamp(), - ioConfig.getMinimumMessageTime().get() - ); - } else if (afterMaximumMessageTime) { - log.debug( - "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", - row.getTimestamp(), - ioConfig.getMaximumMessageTime().get() - ); - } - } - return !beforeMinimumMessageTime && !afterMaximumMessageTime; - } - @Override public String getTaskAllocatorId() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index 6526bb81b1e3..b916affe65f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.segment.indexing.IOConfig; import org.joda.time.DateTime; +import org.joda.time.Duration; import javax.annotation.Nullable; @@ -42,6 +43,7 @@ public abstract class SeekableStreamIndexTaskIOConfig minimumMessageTime; private final Optional maximumMessageTime; private final InputFormat inputFormat; + private final Duration taskDuration; public SeekableStreamIndexTaskIOConfig( @Nullable final Integer taskGroupId, // can be null for backward compabitility @@ -51,7 +53,8 @@ public SeekableStreamIndexTaskIOConfig( final Boolean useTransaction, final DateTime minimumMessageTime, final DateTime maximumMessageTime, - @Nullable final InputFormat inputFormat + @Nullable final InputFormat inputFormat, + final Duration taskDuration ) { this.taskGroupId = taskGroupId; @@ -62,6 +65,7 @@ public SeekableStreamIndexTaskIOConfig( this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); this.inputFormat = inputFormat; + this.taskDuration = taskDuration; Preconditions.checkArgument( startSequenceNumbers.getStream().equals(endSequenceNumbers.getStream()), @@ -134,4 +138,9 @@ public InputFormat getInputFormat() { return inputFormat; } + + public Duration getTaskDuration() + { + return taskDuration; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index ab437eb7a60a..07902f8e35d6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -76,6 +77,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -246,6 +248,9 @@ public enum Status private final Map partitionsThroughput = new HashMap<>(); + private volatile Optional minMessageTime; + private volatile Optional maxMessageTime; + public SeekableStreamIndexTaskRunner( final SeekableStreamIndexTask task, @Nullable final InputRowParser parser, @@ -267,6 +272,16 @@ public SeekableStreamIndexTaskRunner( this.ingestionState = IngestionState.NOT_STARTED; this.lockGranularityToUse = lockGranularityToUse; + minMessageTime = ioConfig.getMinimumMessageTime(); + maxMessageTime = ioConfig.getMaximumMessageTime(); + + Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d") + .scheduleWithFixedDelay( + this::addTaskDurationToMinMaxTimes, + ioConfig.getTaskDuration().getStandardMinutes(), + ioConfig.getTaskDuration().getStandardMinutes(), + TimeUnit.MINUTES + ); resetNextCheckpointTime(); } @@ -388,7 +403,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception inputRowSchema, task.getDataSchema().getTransformSpec(), toolbox.getIndexingTmpDir(), - row -> row != null && task.withinMinMaxRecordTime(row), + row -> row != null && withinMinMaxRecordTime(row), rowIngestionMeters, parseExceptionHandler ); @@ -2092,4 +2107,47 @@ protected abstract void possiblyResetDataSourceMetadata( protected abstract boolean isEndOffsetExclusive(); protected abstract TypeReference>> getSequenceMetadataTypeReference(); + + + private void addTaskDurationToMinMaxTimes() + { + if (minMessageTime.isPresent()) { + minMessageTime = Optional.of(minMessageTime.get() + .plusMinutes(ioConfig.getTaskDuration() + .toStandardMinutes() + .getMinutes())); + } + + if (maxMessageTime.isPresent()) { + maxMessageTime = Optional.of(maxMessageTime.get() + .plusMinutes(ioConfig.getTaskDuration() + .toStandardMinutes() + .getMinutes())); + } + } + + public boolean withinMinMaxRecordTime(final InputRow row) + { + final boolean beforeMinimumMessageTime = minMessageTime.isPresent() && + minMessageTime.get().isAfter(row.getTimestamp()); + final boolean afterMaximumMessageTime = maxMessageTime.isPresent() && + maxMessageTime.get().isBefore(row.getTimestamp()); + + if (log.isDebugEnabled()) { + if (beforeMinimumMessageTime) { + log.debug( + "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", + row.getTimestamp(), + ioConfig.getMinimumMessageTime().get() + ); + } else if (afterMaximumMessageTime) { + log.debug( + "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", + row.getTimestamp(), + ioConfig.getMaximumMessageTime().get() + ); + } + } + return !beforeMinimumMessageTime && !afterMaximumMessageTime; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java index 8b1bd4fb0963..c0ed6984f19c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java @@ -44,6 +44,7 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.ResourceType; import org.easymock.EasyMock; +import org.joda.time.Duration; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -380,7 +381,8 @@ public TestSeekableStreamIndexTaskIOConfig() false, DateTimes.nowUtc().minusDays(2), DateTimes.nowUtc(), - new CsvInputFormat(null, null, true, null, 0, null) + new CsvInputFormat(null, null, true, null, 0, null), + Duration.standardHours(2) ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java new file mode 100644 index 000000000000..847b5f29cc15 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.server.security.AuthorizerMapper; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +@RunWith(MockitoJUnitRunner.class) +public class SeekableStreamIndexTaskRunnerTest +{ + @Mock + private InputRow row; + + @Mock + private SeekableStreamIndexTask task; + + @Test + public void testWithinMinMaxTime() + { + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2") + ) + ); + DataSchema schema = + DataSchema.builder() + .withDataSource("datasource") + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(dimensionsSpec) + .withGranularity( + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) + ) + .build(); + + SeekableStreamIndexTaskTuningConfig tuningConfig = Mockito.mock(SeekableStreamIndexTaskTuningConfig.class); + SeekableStreamIndexTaskIOConfig ioConfig = Mockito.mock(SeekableStreamIndexTaskIOConfig.class); + SeekableStreamStartSequenceNumbers sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class); + SeekableStreamEndSequenceNumbers endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class); + + DateTime now = DateTimes.nowUtc(); + + Mockito.when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(2)); + Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().plusHours(2))); + Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().minusHours(2))); + Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null)); + Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers); + Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers); + + Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of()); + Mockito.when(sequenceNumbers.getStream()).thenReturn("test"); + + Mockito.when(task.getDataSchema()).thenReturn(schema); + Mockito.when(task.getIOConfig()).thenReturn(ioConfig); + Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig); + + TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, null, LockGranularity.TIME_CHUNK); + + Mockito.when(row.getTimestamp()).thenReturn(now); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1)); + Assert.assertFalse(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1)); + Assert.assertFalse(runner.withinMinMaxRecordTime(row)); + } + + @Test + public void testWithinMinMaxTimeNotPopulated() + { + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2") + ) + ); + DataSchema schema = + DataSchema.builder() + .withDataSource("datasource") + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(dimensionsSpec) + .withGranularity( + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null) + ) + .build(); + + SeekableStreamIndexTaskTuningConfig tuningConfig = Mockito.mock(SeekableStreamIndexTaskTuningConfig.class); + SeekableStreamIndexTaskIOConfig ioConfig = Mockito.mock(SeekableStreamIndexTaskIOConfig.class); + SeekableStreamStartSequenceNumbers sequenceNumbers = Mockito.mock(SeekableStreamStartSequenceNumbers.class); + SeekableStreamEndSequenceNumbers endSequenceNumbers = Mockito.mock(SeekableStreamEndSequenceNumbers.class); + + DateTime now = DateTimes.nowUtc(); + + Mockito.when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(2)); + Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.absent()); + Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.absent()); + Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null)); + Mockito.when(ioConfig.getStartSequenceNumbers()).thenReturn(sequenceNumbers); + Mockito.when(ioConfig.getEndSequenceNumbers()).thenReturn(endSequenceNumbers); + + Mockito.when(endSequenceNumbers.getPartitionSequenceNumberMap()).thenReturn(ImmutableMap.of()); + Mockito.when(sequenceNumbers.getStream()).thenReturn("test"); + + Mockito.when(task.getDataSchema()).thenReturn(schema); + Mockito.when(task.getIOConfig()).thenReturn(ioConfig); + Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig); + TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, null, LockGranularity.TIME_CHUNK); + + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + } + + static class TestasbleSeekableStreamIndexTaskRunner extends SeekableStreamIndexTaskRunner + { + public TestasbleSeekableStreamIndexTaskRunner( + SeekableStreamIndexTask task, + @Nullable InputRowParser parser, + AuthorizerMapper authorizerMapper, + LockGranularity lockGranularityToUse + ) + { + super(task, parser, authorizerMapper, lockGranularityToUse); + } + + @Override + protected boolean isEndOfShard(Object seqNum) + { + return false; + } + + @Nullable + @Override + protected TreeMap getCheckPointsFromContext(TaskToolbox toolbox, String checkpointsString) + throws IOException + { + return null; + } + + @Override + protected Object getNextStartOffset(Object sequenceNumber) + { + return null; + } + + @Override + protected SeekableStreamEndSequenceNumbers deserializePartitionsFromMetadata(ObjectMapper mapper, Object object) + { + return null; + } + + @Override + protected List getRecords(RecordSupplier recordSupplier, TaskToolbox toolbox) + throws Exception + { + return null; + } + + @Override + protected SeekableStreamDataSourceMetadata createDataSourceMetadata(SeekableStreamSequenceNumbers partitions) + { + return null; + } + + @Override + protected OrderedSequenceNumber createSequenceNumber(Object sequenceNumber) + { + return null; + } + + @Override + protected boolean isEndOffsetExclusive() + { + return false; + } + + @Override + protected TypeReference> getSequenceMetadataTypeReference() + { + return null; + } + + @Override + protected void possiblyResetDataSourceMetadata(TaskToolbox toolbox, RecordSupplier recordSupplier, Set assignment) + { + + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 3281360f5806..4ce4a95a7650 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -196,7 +196,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration() ) { }; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index af66ce3b8b97..6edf77f53c71 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2807,7 +2807,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration() ) { }; @@ -3166,7 +3167,8 @@ private static SeekableStreamIndexTaskIOConfig createTaskIoConfigExt( true, minimumMessageTime, maximumMessageTime, - ioConfig.getInputFormat() + ioConfig.getInputFormat(), + ioConfig.getTaskDuration() ) { }; From 05dce760b0299c6835faa6b5987bcd39d1f7e804 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Sun, 3 Nov 2024 11:29:45 -0600 Subject: [PATCH 03/12] backward --- .../SeekableStreamIndexTaskIOConfig.java | 2 +- .../SeekableStreamIndexTaskRunner.java | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index b916affe65f1..2318b0c81e81 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -54,7 +54,7 @@ public SeekableStreamIndexTaskIOConfig( final DateTime minimumMessageTime, final DateTime maximumMessageTime, @Nullable final InputFormat inputFormat, - final Duration taskDuration + @Nullable final Duration taskDuration // can be null for backward compabitility ) { this.taskGroupId = taskGroupId; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 07902f8e35d6..744afcfd5038 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -275,13 +275,15 @@ public SeekableStreamIndexTaskRunner( minMessageTime = ioConfig.getMinimumMessageTime(); maxMessageTime = ioConfig.getMaximumMessageTime(); - Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d") - .scheduleWithFixedDelay( - this::addTaskDurationToMinMaxTimes, - ioConfig.getTaskDuration().getStandardMinutes(), - ioConfig.getTaskDuration().getStandardMinutes(), - TimeUnit.MINUTES - ); + if (ioConfig.getTaskDuration() != null) { + Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d") + .scheduleWithFixedDelay( + this::addTaskDurationToMinMaxTimes, + ioConfig.getTaskDuration().getStandardMinutes(), + ioConfig.getTaskDuration().getStandardMinutes(), + TimeUnit.MINUTES + ); + } resetNextCheckpointTime(); } From f92873a2e3fd3e4c858765bc28d23cef191463bc Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Sun, 3 Nov 2024 13:18:10 -0600 Subject: [PATCH 04/12] tests --- .../RabbitStreamSupervisorTest.java | 45 +++++++++++++++++++ .../SeekableStreamIndexTaskRunnerTest.java | 3 -- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index e52ca2f29b64..381cab2289b3 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.JsonInputFormat; @@ -50,6 +51,7 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMockSupport; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -366,4 +368,47 @@ public void testReportPayload() Assert.assertEquals(30 * 60, payload.getDurationSeconds()); } + @Test + public void testCreateTaskIOConfig() + { + supervisor = getSupervisor( + 1, + 1, + false, + "PT30M", + null, + null, + RabbitStreamSupervisorTest.dataSchema, + tuningConfig + ); + + Assert.assertEquals(supervisor.createTaskIoConfig( + 1, + ImmutableMap.of(), + ImmutableMap.of(), + "test", + null, + null, + ImmutableSet.of(), + new RabbitStreamSupervisorIOConfig( + STREAM, // stream + URI, // uri + INPUT_FORMAT, // inputFormat + 1, // replicas + 1, // taskCount + new Period("PT30M"), // taskDuration + null, // consumerProperties + null, // autoscalerConfig + 400L, // poll timeout + new Period("P1D"), // start delat + new Period("PT30M"), // period + new Period("PT30S"), // completiontimeout + false, // useearliest + null, // latemessagerejection + null, // early message rejection + null, // latemessagerejectionstartdatetime + 1 + ) + ).getTaskDuration(), Duration.standardMinutes(30)); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java index 847b5f29cc15..4e5739f6fcfb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java @@ -49,7 +49,6 @@ import org.mockito.junit.MockitoJUnitRunner; import javax.annotation.Nullable; -import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -184,7 +183,6 @@ protected boolean isEndOfShard(Object seqNum) @Nullable @Override protected TreeMap getCheckPointsFromContext(TaskToolbox toolbox, String checkpointsString) - throws IOException { return null; } @@ -203,7 +201,6 @@ protected SeekableStreamEndSequenceNumbers deserializePartitionsFromMetadata(Obj @Override protected List getRecords(RecordSupplier recordSupplier, TaskToolbox toolbox) - throws Exception { return null; } From 0a67e03f2cd5067f835bfa3d228e024a2f9a77f5 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Nov 2024 13:20:39 -0600 Subject: [PATCH 05/12] remove unwanted changes --- .../indexing/kafka/KafkaConsumerMonitor.java | 8 +--- .../druid/indexing/kafka/KafkaIndexTask.java | 2 +- .../indexing/kafka/KafkaRecordSupplier.java | 11 ++---- .../indexing/kafka/KafkaSamplerSpec.java | 2 +- .../kafka/supervisor/KafkaSupervisor.java | 3 +- .../kafka/KafkaRecordSupplierTest.java | 37 ++++++++----------- .../kafka/supervisor/KafkaSupervisorTest.java | 2 +- 7 files changed, 25 insertions(+), 40 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java index 4ed2493da550..9ea0e5a0106c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java @@ -43,20 +43,17 @@ public class KafkaConsumerMonitor extends AbstractMonitor ImmutableMap.builder() .put("bytes-consumed-total", "kafka/consumer/bytesConsumed") .put("records-consumed-total", "kafka/consumer/recordsConsumed") - .put("io-wait-time-ns-total", "kafka/consumer/io/time") .build(); private static final String TOPIC_TAG = "topic"; private static final String DATASOURCE_TAG = "datasource"; private static final Set TOPIC_METRIC_TAGS = ImmutableSet.of("client-id", TOPIC_TAG); private final KafkaConsumer consumer; - private final String datasource; private final Map counters = new HashMap<>(); - public KafkaConsumerMonitor(final KafkaConsumer consumer, String datasource) + public KafkaConsumerMonitor(final KafkaConsumer consumer) { this.consumer = consumer; - this.datasource = datasource; } @Override @@ -75,9 +72,6 @@ public boolean doMonitor(final ServiceEmitter emitter) if (newValue != priorValue) { final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setDimension(TOPIC_TAG, topic); - if (datasource != null) { - builder.setDimension(DATASOURCE_TAG, datasource); - } emitter.emit(builder.setMetric(METRICS.get(metricName.name()), newValue - priorValue)); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index 2834f2d31ce5..cb77cbae16e7 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -111,7 +111,7 @@ protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) final KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides(), - kafkaIndexTaskIOConfig.isMultiTopic(), dataSchema.getDataSource() + kafkaIndexTaskIOConfig.isMultiTopic() ); if (toolbox.getMonitorScheduler() != null) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 5b8ea4f410e0..a88300552e2a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -45,7 +45,6 @@ import org.apache.kafka.common.serialization.Deserializer; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Type; @@ -79,23 +78,21 @@ public KafkaRecordSupplier( Map consumerProperties, ObjectMapper sortingMapper, KafkaConfigOverrides configOverrides, - boolean multiTopic, - @Nullable String datasource + boolean multiTopic ) { - this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic, datasource); + this(getKafkaConsumer(sortingMapper, consumerProperties, configOverrides), multiTopic); } @VisibleForTesting public KafkaRecordSupplier( KafkaConsumer consumer, - boolean multiTopic, - @Nullable String datasource + boolean multiTopic ) { this.consumer = consumer; this.multiTopic = multiTopic; - this.monitor = new KafkaConsumerMonitor(consumer, datasource); + this.monitor = new KafkaConsumerMonitor(consumer); } @Override diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java index e941bff3885f..e0683fb605bb 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java @@ -74,7 +74,7 @@ protected KafkaRecordSupplier createRecordSupplier() KafkaSupervisorIOConfig kafkaSupervisorIOConfig = (KafkaSupervisorIOConfig) ioConfig; return new KafkaRecordSupplier(props, objectMapper, kafkaSupervisorIOConfig.getConfigOverrides(), - kafkaSupervisorIOConfig.isMultiTopic(), null + kafkaSupervisorIOConfig.isMultiTopic() ); } finally { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index cff3e508aa0c..da6f41ff8475 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -137,8 +137,7 @@ protected RecordSupplier setupReco spec.getIoConfig().getConsumerProperties(), sortingMapper, spec.getIoConfig().getConfigOverrides(), - spec.getIoConfig().isMultiTopic(), - spec.getId() + spec.getIoConfig().isMultiTopic() ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index a12357f9dfc5..bef7096db512 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -240,7 +240,7 @@ public void testSupplierSetup() throws ExecutionException, InterruptedException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -265,7 +265,7 @@ public void testMultiTopicSupplierSetup() throws ExecutionException, Interrupted insertData(); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true, STREAM); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true); String stream = Pattern.quote(TOPIC) + "|" + Pattern.quote(otherTopic); Set partitions = recordSupplier.getPartitionIds(stream); @@ -301,8 +301,7 @@ public void testSupplierSetupCustomDeserializer() throws ExecutionException, Int properties, OBJECT_MAPPER, null, - false, - STREAM + false ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -330,8 +329,7 @@ public void testSupplierSetupCustomDeserializerRequiresParameter() properties, OBJECT_MAPPER, null, - false, - STREAM + false ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated @@ -350,8 +348,7 @@ public void testSupplierSetupCustomDeserializerRequiresParameterButMissingIt() properties, OBJECT_MAPPER, null, - false, - STREAM + false ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); //just test recordSupplier is initiated @@ -378,8 +375,7 @@ public void testPollCustomDeserializer() throws InterruptedException, ExecutionE properties, OBJECT_MAPPER, null, - false, - STREAM + false ); recordSupplier.assign(partitions); @@ -417,8 +413,7 @@ public void testPoll() throws InterruptedException, ExecutionException KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, - false, - STREAM + false ); final Monitor monitor = recordSupplier.monitor(); @@ -469,7 +464,7 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -539,7 +534,7 @@ public void testSeek() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -582,7 +577,7 @@ public void testSeekToLatest() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -615,7 +610,7 @@ public void testSeekUnassigned() throws InterruptedException, ExecutionException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); recordSupplier.assign(partitions); @@ -641,7 +636,7 @@ public void testPosition() throws ExecutionException, InterruptedException ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -676,7 +671,7 @@ public void testPosition() throws ExecutionException, InterruptedException public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -688,7 +683,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShoul public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -700,7 +695,7 @@ public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetSho public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); @@ -712,7 +707,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldR public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldReturnsValidNonNull() { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( - KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false, STREAM); + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index e04b1f030f8b..6b78a2bdb29a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -5780,7 +5780,7 @@ protected RecordSupplier setupReco Deserializer valueDeserializerObject = new ByteArrayDeserializer(); return new KafkaRecordSupplier( new KafkaConsumer<>(props, keyDeserializerObject, valueDeserializerObject), - getIoConfig().isMultiTopic(), null + getIoConfig().isMultiTopic() ); } From 69ea3446d2c02941400703a4af8dd156084e952d Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Nov 2024 13:49:18 -0600 Subject: [PATCH 06/12] comments --- .../RabbitStreamSupervisorTest.java | 7 ++-- .../SeekableStreamIndexTaskIOConfig.java | 2 ++ .../SeekableStreamIndexTaskRunner.java | 34 ++++++------------- 3 files changed, 17 insertions(+), 26 deletions(-) diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index 381cab2289b3..817c46fa7b1f 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -38,6 +38,7 @@ import org.apache.druid.indexing.rabbitstream.RabbitStreamIndexTaskClientFactory; import org.apache.druid.indexing.rabbitstream.RabbitStreamRecordSupplier; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -382,7 +383,7 @@ public void testCreateTaskIOConfig() tuningConfig ); - Assert.assertEquals(supervisor.createTaskIoConfig( + SeekableStreamIndexTaskIOConfig ioConfig = supervisor.createTaskIoConfig( 1, ImmutableMap.of(), ImmutableMap.of(), @@ -409,6 +410,8 @@ public void testCreateTaskIOConfig() null, // latemessagerejectionstartdatetime 1 ) - ).getTaskDuration(), Duration.standardMinutes(30)); + ); + + Assert.assertEquals(ioConfig.getTaskDuration(), Duration.standardMinutes(30)); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index 2318b0c81e81..d3da738ef7b8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -139,6 +139,8 @@ public InputFormat getInputFormat() return inputFormat; } + @Nullable + @JsonProperty public Duration getTaskDuration() { return taskDuration; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 744afcfd5038..fbe52d87b83f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -248,8 +247,8 @@ public enum Status private final Map partitionsThroughput = new HashMap<>(); - private volatile Optional minMessageTime; - private volatile Optional maxMessageTime; + private volatile DateTime minMessageTime; + private volatile DateTime maxMessageTime; public SeekableStreamIndexTaskRunner( final SeekableStreamIndexTask task, @@ -272,8 +271,8 @@ public SeekableStreamIndexTaskRunner( this.ingestionState = IngestionState.NOT_STARTED; this.lockGranularityToUse = lockGranularityToUse; - minMessageTime = ioConfig.getMinimumMessageTime(); - maxMessageTime = ioConfig.getMaximumMessageTime(); + minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN); + maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX); if (ioConfig.getTaskDuration() != null) { Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d") @@ -2113,40 +2112,27 @@ protected abstract void possiblyResetDataSourceMetadata( private void addTaskDurationToMinMaxTimes() { - if (minMessageTime.isPresent()) { - minMessageTime = Optional.of(minMessageTime.get() - .plusMinutes(ioConfig.getTaskDuration() - .toStandardMinutes() - .getMinutes())); - } - - if (maxMessageTime.isPresent()) { - maxMessageTime = Optional.of(maxMessageTime.get() - .plusMinutes(ioConfig.getTaskDuration() - .toStandardMinutes() - .getMinutes())); - } + minMessageTime = minMessageTime.plusMinutes(ioConfig.getTaskDuration().toStandardMinutes().getMinutes()); + maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getTaskDuration().toStandardMinutes().getMinutes()); } public boolean withinMinMaxRecordTime(final InputRow row) { - final boolean beforeMinimumMessageTime = minMessageTime.isPresent() && - minMessageTime.get().isAfter(row.getTimestamp()); - final boolean afterMaximumMessageTime = maxMessageTime.isPresent() && - maxMessageTime.get().isBefore(row.getTimestamp()); + final boolean beforeMinimumMessageTime = minMessageTime.isAfter(row.getTimestamp()); + final boolean afterMaximumMessageTime = maxMessageTime.isBefore(row.getTimestamp()); if (log.isDebugEnabled()) { if (beforeMinimumMessageTime) { log.debug( "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", row.getTimestamp(), - ioConfig.getMinimumMessageTime().get() + minMessageTime ); } else if (afterMaximumMessageTime) { log.debug( "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", row.getTimestamp(), - ioConfig.getMaximumMessageTime().get() + maxMessageTime ); } } From bba948637cfd52a35e599f9c1246544fa45063af Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 5 Nov 2024 13:51:54 -0600 Subject: [PATCH 07/12] comments --- .../rabbitstream/supervisor/RabbitStreamSupervisorTest.java | 2 +- .../org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java | 1 - .../java/org/apache/druid/indexing/kafka/KafkaIndexTask.java | 3 +-- .../apache/druid/indexing/kafka/KafkaRecordSupplierTest.java | 1 - 4 files changed, 2 insertions(+), 5 deletions(-) diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index 817c46fa7b1f..5694d3440118 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -412,6 +412,6 @@ public void testCreateTaskIOConfig() ) ); - Assert.assertEquals(ioConfig.getTaskDuration(), Duration.standardMinutes(30)); + Assert.assertEquals(Duration.standardMinutes(30), ioConfig.getTaskDuration()); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java index 9ea0e5a0106c..6a8c6c149051 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java @@ -45,7 +45,6 @@ public class KafkaConsumerMonitor extends AbstractMonitor .put("records-consumed-total", "kafka/consumer/recordsConsumed") .build(); private static final String TOPIC_TAG = "topic"; - private static final String DATASOURCE_TAG = "datasource"; private static final Set TOPIC_METRIC_TAGS = ImmutableSet.of("client-id", TOPIC_TAG); private final KafkaConsumer consumer; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index cb77cbae16e7..e5ff77467cdb 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -111,8 +111,7 @@ protected KafkaRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) final KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier(props, configMapper, kafkaIndexTaskIOConfig.getConfigOverrides(), - kafkaIndexTaskIOConfig.isMultiTopic() - ); + kafkaIndexTaskIOConfig.isMultiTopic()); if (toolbox.getMonitorScheduler() != null) { toolbox.getMonitorScheduler().addMonitor(recordSupplier.monitor()); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index bef7096db512..bfd81464ba2f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -73,7 +73,6 @@ public class KafkaRecordSupplierTest private static KafkaTopicPartition PARTITION_1 = new KafkaTopicPartition(false, null, 1); private static String TOPIC = "topic"; - private static String STREAM = "stream"; private static int TOPIC_POS_FIX = 0; private static TestingCluster ZK_SERVER; private static TestBroker KAFKA_SERVER; From 589d415e3b86b6bbd83c5f4bc6c3702e444a0843 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Wed, 6 Nov 2024 09:46:05 -0600 Subject: [PATCH 08/12] coverage --- .../SeekableStreamIndexTaskRunnerTest.java | 7 ++++- .../src/test/resources/log4j2.xml | 27 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 indexing-service/src/test/resources/log4j2.xml diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java index 4e5739f6fcfb..bd18e47a2c76 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java @@ -142,7 +142,8 @@ public void testWithinMinMaxTimeNotPopulated() DateTime now = DateTimes.nowUtc(); - Mockito.when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(2)); + Mockito.when(ioConfig.getTaskDuration()).thenReturn(null); + // min max time not populated. Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.absent()); Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.absent()); Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null)); @@ -158,7 +159,11 @@ public void testWithinMinMaxTimeNotPopulated() TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, null, LockGranularity.TIME_CHUNK); Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.minusHours(2).minusMinutes(1)); Assert.assertTrue(runner.withinMinMaxRecordTime(row)); + + Mockito.when(row.getTimestamp()).thenReturn(now.plusHours(2).plusMinutes(1)); Assert.assertTrue(runner.withinMinMaxRecordTime(row)); } diff --git a/indexing-service/src/test/resources/log4j2.xml b/indexing-service/src/test/resources/log4j2.xml new file mode 100644 index 000000000000..ca81b189289d --- /dev/null +++ b/indexing-service/src/test/resources/log4j2.xml @@ -0,0 +1,27 @@ + + + + + + + + + + From de43cad60dca9b78d8b475f03dbde1d6b54e118f Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Thu, 7 Nov 2024 07:33:54 -0600 Subject: [PATCH 09/12] change name --- .../SeekableStreamIndexTaskIOConfig.java | 11 +++++------ .../SeekableStreamIndexTaskRunner.java | 15 +++++++-------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index d3da738ef7b8..b0b22dba93d8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -26,7 +26,6 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.segment.indexing.IOConfig; import org.joda.time.DateTime; -import org.joda.time.Duration; import javax.annotation.Nullable; @@ -43,7 +42,7 @@ public abstract class SeekableStreamIndexTaskIOConfig minimumMessageTime; private final Optional maximumMessageTime; private final InputFormat inputFormat; - private final Duration taskDuration; + private final Integer refreshRejectionPeriodsInMinutes; public SeekableStreamIndexTaskIOConfig( @Nullable final Integer taskGroupId, // can be null for backward compabitility @@ -54,7 +53,7 @@ public SeekableStreamIndexTaskIOConfig( final DateTime minimumMessageTime, final DateTime maximumMessageTime, @Nullable final InputFormat inputFormat, - @Nullable final Duration taskDuration // can be null for backward compabitility + @Nullable final Integer refreshRejectionPeriodsInMinutes // can be null for backward compabitility ) { this.taskGroupId = taskGroupId; @@ -65,7 +64,7 @@ public SeekableStreamIndexTaskIOConfig( this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); this.inputFormat = inputFormat; - this.taskDuration = taskDuration; + this.refreshRejectionPeriodsInMinutes = refreshRejectionPeriodsInMinutes; Preconditions.checkArgument( startSequenceNumbers.getStream().equals(endSequenceNumbers.getStream()), @@ -141,8 +140,8 @@ public InputFormat getInputFormat() @Nullable @JsonProperty - public Duration getTaskDuration() + public Integer getRefreshRejectionPeriodsInMinutes() { - return taskDuration; + return refreshRejectionPeriodsInMinutes; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index fbe52d87b83f..4e38a952999b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -274,12 +274,12 @@ public SeekableStreamIndexTaskRunner( minMessageTime = ioConfig.getMinimumMessageTime().or(DateTimes.MIN); maxMessageTime = ioConfig.getMaximumMessageTime().or(DateTimes.MAX); - if (ioConfig.getTaskDuration() != null) { + if (ioConfig.getRefreshRejectionPeriodsInMinutes() != null) { Execs.scheduledSingleThreaded("RejectionPeriodUpdater-Exec--%d") .scheduleWithFixedDelay( - this::addTaskDurationToMinMaxTimes, - ioConfig.getTaskDuration().getStandardMinutes(), - ioConfig.getTaskDuration().getStandardMinutes(), + this::refreshMinMaxMessageTime, + ioConfig.getRefreshRejectionPeriodsInMinutes(), + ioConfig.getRefreshRejectionPeriodsInMinutes(), TimeUnit.MINUTES ); } @@ -2109,11 +2109,10 @@ protected abstract void possiblyResetDataSourceMetadata( protected abstract TypeReference>> getSequenceMetadataTypeReference(); - - private void addTaskDurationToMinMaxTimes() + private void refreshMinMaxMessageTime() { - minMessageTime = minMessageTime.plusMinutes(ioConfig.getTaskDuration().toStandardMinutes().getMinutes()); - maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getTaskDuration().toStandardMinutes().getMinutes()); + minMessageTime = minMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes()); + maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes()); } public boolean withinMinMaxRecordTime(final InputRow row) From ad8db680e9e6ba504dc75d61575b6eebe2f4edee Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Thu, 7 Nov 2024 10:03:46 -0600 Subject: [PATCH 10/12] fixes --- .../RabbitStreamIndexTaskIOConfig.java | 2 +- .../RabbitStreamSupervisorTest.java | 3 +- .../kafka/KafkaIndexTaskIOConfig.java | 9 +- .../kafka/supervisor/KafkaSupervisor.java | 2 +- .../indexing/kafka/KafkaIndexTaskTest.java | 106 +++++++++--------- .../kafka/supervisor/KafkaSupervisorTest.java | 4 +- .../kinesis/KinesisIndexTaskIOConfig.java | 2 +- .../SeekableStreamIndexTaskIOConfig.java | 6 +- .../SeekableStreamIndexTaskRunner.java | 4 +- ...SeekableStreamIndexTaskRunnerAuthTest.java | 2 +- .../SeekableStreamIndexTaskRunnerTest.java | 5 +- .../SeekableStreamSupervisorSpecTest.java | 2 +- .../SeekableStreamSupervisorStateTest.java | 4 +- 13 files changed, 74 insertions(+), 77 deletions(-) diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java index 8739fbed9b55..4c8b0d292eda 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java @@ -67,7 +67,7 @@ public RabbitStreamIndexTaskIOConfig( minimumMessageTime, maximumMessageTime, inputFormat, - taskDuration + taskDuration.getStandardMinutes() ); this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS; diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index 5694d3440118..b2ac927b17e1 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -52,7 +52,6 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMockSupport; -import org.joda.time.Duration; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -412,6 +411,6 @@ public void testCreateTaskIOConfig() ) ); - Assert.assertEquals(Duration.standardMinutes(30), ioConfig.getTaskDuration()); + Assert.assertEquals(30L, ioConfig.getRefreshRejectionPeriodsInMinutes().longValue()); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java index 3d21d2d8aa46..07c0f80fbe83 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java @@ -30,7 +30,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides; import org.joda.time.DateTime; -import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.Map; @@ -65,7 +64,7 @@ public KafkaIndexTaskIOConfig( @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, @JsonProperty("configOverrides") @Nullable KafkaConfigOverrides configOverrides, @JsonProperty("multiTopic") @Nullable Boolean multiTopic, - @JsonProperty("taskDuration") Duration taskDuration + @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes ) { super( @@ -79,7 +78,7 @@ public KafkaIndexTaskIOConfig( minimumMessageTime, maximumMessageTime, inputFormat, - taskDuration + refreshRejectionPeriodsInMinutes ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); @@ -111,7 +110,7 @@ public KafkaIndexTaskIOConfig( DateTime maximumMessageTime, InputFormat inputFormat, KafkaConfigOverrides configOverrides, - Duration taskDuration + Long refreshRejectionPeriodsInMinutes ) { this( @@ -129,7 +128,7 @@ public KafkaIndexTaskIOConfig( inputFormat, configOverrides, KafkaSupervisorIOConfig.DEFAULT_IS_MULTI_TOPIC, - taskDuration + refreshRejectionPeriodsInMinutes ); } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index da6f41ff8475..2618c22495ea 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -219,7 +219,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( ioConfig.getInputFormat(), kafkaIoConfig.getConfigOverrides(), kafkaIoConfig.isMultiTopic(), - ioConfig.getTaskDuration() + ioConfig.getTaskDuration().getStandardMinutes() ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 854d9234ced9..8de61fd0285b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -359,7 +359,7 @@ public void testRunAfterDataInserted() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); Assert.assertTrue(task.supportsQueries()); @@ -416,7 +416,7 @@ public void testIngestNullColumnAfterDataInserted() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -465,7 +465,7 @@ public void testIngestNullColumnAfterDataInserted_storeEmptyColumnsOff_shouldNot null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); task.addToContext(Tasks.STORE_EMPTY_COLUMNS_KEY, false); @@ -501,7 +501,7 @@ public void testRunAfterDataInsertedWithLegacyParser() throws Exception null, null, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -543,7 +543,7 @@ public void testRunBeforeDataInserted() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -595,7 +595,7 @@ public void testRunAfterDataInsertedLiveReport() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -673,7 +673,7 @@ public void testIncrementalHandOff() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -777,7 +777,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -904,7 +904,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -983,7 +983,7 @@ public void testCheckpointResetWithSameEndOffsets() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -1047,7 +1047,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask staleReplica = createTask( @@ -1064,7 +1064,7 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1131,7 +1131,7 @@ public void testRunWithMinimumMessageTime() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1180,7 +1180,7 @@ public void testRunWithMaximumMessageTime() throws Exception DateTimes.of("2010"), INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1238,7 +1238,7 @@ public void testRunWithTransformSpec() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1310,7 +1310,7 @@ public void testKafkaRecordEntityInputFormat() throws Exception null, new TestKafkaInputFormat(INPUT_FORMAT), null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); Assert.assertTrue(task.supportsQueries()); @@ -1383,7 +1383,7 @@ public void testKafkaInputFormat() throws Exception null, KAFKA_INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); Assert.assertTrue(task.supportsQueries()); @@ -1435,7 +1435,7 @@ public void testRunOnNothing() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1471,7 +1471,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1518,7 +1518,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1570,7 +1570,7 @@ public void testReportParseExceptions() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1609,7 +1609,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1702,7 +1702,7 @@ public void testMultipleParseExceptionsFailure() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1769,7 +1769,7 @@ public void testRunReplicas() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -1786,7 +1786,7 @@ public void testRunReplicas() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1835,7 +1835,7 @@ public void testRunConflicting() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -1852,7 +1852,7 @@ public void testRunConflicting() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1903,7 +1903,7 @@ public void testRunConflictingWithoutTransactions() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -1920,7 +1920,7 @@ public void testRunConflictingWithoutTransactions() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1969,7 +1969,7 @@ public void testRunOneTaskTwoPartitions() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2016,7 +2016,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final KafkaIndexTask task2 = createTask( @@ -2033,7 +2033,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2084,7 +2084,7 @@ public void testRestore() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2119,7 +2119,7 @@ public void testRestore() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2170,7 +2170,7 @@ public void testRestoreAfterPersistingSequences() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2214,7 +2214,7 @@ public void testRestoreAfterPersistingSequences() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2266,7 +2266,7 @@ public void testRunWithPauseAndResume() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2338,7 +2338,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2376,7 +2376,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2424,7 +2424,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ), context ); @@ -2469,7 +2469,7 @@ public void testRunWithDuplicateRequest() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2509,7 +2509,7 @@ public void testRunTransactionModeRollback() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2591,7 +2591,7 @@ public void testRunUnTransactionMode() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2652,7 +2652,7 @@ public void testCanStartFromLaterThanEarliestOffset() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); final ListenableFuture future = runTask(task); @@ -2676,7 +2676,7 @@ public void testRunWithoutDataInserted() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2723,7 +2723,7 @@ public void testSerde() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -2756,7 +2756,7 @@ public void testCorrectInputSources() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3019,7 +3019,7 @@ public void testMultipleLinesJSONText() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3081,7 +3081,7 @@ public void testParseExceptionsInIteratorConstructionSuccess() throws Exception null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3154,7 +3154,7 @@ public void testNoParseExceptionsTaskSucceeds() throws Exception null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3229,7 +3229,7 @@ public void testParseExceptionsBeyondThresholdTaskFails() throws Exception null, new TestKafkaFormatWithMalformedDataDetection(INPUT_FORMAT), null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3282,7 +3282,7 @@ public void testCompletionReportPartitionStats() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -3337,7 +3337,7 @@ public void testCompletionReportMultiplePartitionStats() throws Exception null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 6b78a2bdb29a..8f2f3167ecd9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -498,7 +498,7 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException null, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ), new KafkaIndexTaskTuningConfig( null, @@ -5644,7 +5644,7 @@ private KafkaIndexTask createKafkaIndexTask( maximumMessageTime, INPUT_FORMAT, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ), Collections.emptyMap(), OBJECT_MAPPER diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index 065bf9636ce8..60f1d1230e81 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -92,7 +92,7 @@ public KinesisIndexTaskIOConfig( minimumMessageTime, maximumMessageTime, inputFormat, - taskDuration + taskDuration.getStandardMinutes() ); Preconditions.checkArgument( getEndSequenceNumbers().getPartitionSequenceNumberMap() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index b0b22dba93d8..928149cdee44 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -42,7 +42,7 @@ public abstract class SeekableStreamIndexTaskIOConfig minimumMessageTime; private final Optional maximumMessageTime; private final InputFormat inputFormat; - private final Integer refreshRejectionPeriodsInMinutes; + private final Long refreshRejectionPeriodsInMinutes; public SeekableStreamIndexTaskIOConfig( @Nullable final Integer taskGroupId, // can be null for backward compabitility @@ -53,7 +53,7 @@ public SeekableStreamIndexTaskIOConfig( final DateTime minimumMessageTime, final DateTime maximumMessageTime, @Nullable final InputFormat inputFormat, - @Nullable final Integer refreshRejectionPeriodsInMinutes // can be null for backward compabitility + @Nullable final Long refreshRejectionPeriodsInMinutes // can be null for backward compabitility ) { this.taskGroupId = taskGroupId; @@ -140,7 +140,7 @@ public InputFormat getInputFormat() @Nullable @JsonProperty - public Integer getRefreshRejectionPeriodsInMinutes() + public Long getRefreshRejectionPeriodsInMinutes() { return refreshRejectionPeriodsInMinutes; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 4e38a952999b..cd169864a527 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -2111,8 +2111,8 @@ protected abstract void possiblyResetDataSourceMetadata( private void refreshMinMaxMessageTime() { - minMessageTime = minMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes()); - maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes()); + minMessageTime = minMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue()); + maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue()); } public boolean withinMinMaxRecordTime(final InputRow row) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java index c0ed6984f19c..d3b69d438d5f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerAuthTest.java @@ -382,7 +382,7 @@ public TestSeekableStreamIndexTaskIOConfig() DateTimes.nowUtc().minusDays(2), DateTimes.nowUtc(), new CsvInputFormat(null, null, true, null, 0, null), - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java index bd18e47a2c76..f78fd680e36f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java @@ -40,7 +40,6 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.server.security.AuthorizerMapper; import org.joda.time.DateTime; -import org.joda.time.Duration; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -90,7 +89,7 @@ public void testWithinMinMaxTime() DateTime now = DateTimes.nowUtc(); - Mockito.when(ioConfig.getTaskDuration()).thenReturn(Duration.standardHours(2)); + Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(120L); Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().plusHours(2))); Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.of(DateTimes.nowUtc().minusHours(2))); Mockito.when(ioConfig.getInputFormat()).thenReturn(new JsonInputFormat(null, null, null, null, null)); @@ -142,7 +141,7 @@ public void testWithinMinMaxTimeNotPopulated() DateTime now = DateTimes.nowUtc(); - Mockito.when(ioConfig.getTaskDuration()).thenReturn(null); + Mockito.when(ioConfig.getRefreshRejectionPeriodsInMinutes()).thenReturn(null); // min max time not populated. Mockito.when(ioConfig.getMaximumMessageTime()).thenReturn(Optional.absent()); Mockito.when(ioConfig.getMinimumMessageTime()).thenReturn(Optional.absent()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 4ce4a95a7650..5f5aeb0ca853 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -197,7 +197,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat(), - ioConfig.getTaskDuration() + ioConfig.getTaskDuration().getStandardMinutes() ) { }; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 6edf77f53c71..8a3d36ea67fe 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2808,7 +2808,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat(), - ioConfig.getTaskDuration() + ioConfig.getTaskDuration().getStandardMinutes() ) { }; @@ -3168,7 +3168,7 @@ private static SeekableStreamIndexTaskIOConfig createTaskIoConfigExt( minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat(), - ioConfig.getTaskDuration() + ioConfig.getTaskDuration().getStandardMinutes() ) { }; From 112eb9d3dc4d741ac88e82a1c66bcf5599b5e1be Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Fri, 8 Nov 2024 10:07:47 -0600 Subject: [PATCH 11/12] fixes --- .../rabbitstream/RabbitStreamIndexTaskIOConfig.java | 5 ++--- .../supervisor/RabbitStreamSupervisor.java | 2 +- .../indexing/kinesis/KinesisIndexTaskIOConfig.java | 9 ++++----- .../indexing/kinesis/supervisor/KinesisSupervisor.java | 2 +- .../druid/indexing/kinesis/KinesisIOConfigTest.java | 2 +- .../indexing/kinesis/KinesisIndexTaskSerdeTest.java | 2 +- .../druid/indexing/kinesis/KinesisIndexTaskTest.java | 10 +++++----- .../kinesis/supervisor/KinesisSupervisorTest.java | 2 +- 8 files changed, 16 insertions(+), 18 deletions(-) diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java index 4c8b0d292eda..721e66f6f3af 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java @@ -28,7 +28,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.joda.time.DateTime; -import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.Map; @@ -55,7 +54,7 @@ public RabbitStreamIndexTaskIOConfig( @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("inputFormat") @Nullable InputFormat inputFormat, @JsonProperty("uri") String uri, - @JsonProperty("taskDuration") Duration taskDuration + @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes ) { super( @@ -67,7 +66,7 @@ public RabbitStreamIndexTaskIOConfig( minimumMessageTime, maximumMessageTime, inputFormat, - taskDuration.getStandardMinutes() + refreshRejectionPeriodsInMinutes ); this.pollTimeout = pollTimeout != null ? pollTimeout : RabbitStreamSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS; diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java index d0c1045141ca..5e9d3c3d2c6a 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java @@ -202,7 +202,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( maximumMessageTime, ioConfig.getInputFormat(), rabbitConfig.getUri(), - ioConfig.getTaskDuration() + ioConfig.getTaskDuration().getStandardMinutes() ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index 60f1d1230e81..2df59bbca35f 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -28,7 +28,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.joda.time.DateTime; -import org.joda.time.Duration; import javax.annotation.Nullable; import java.util.Set; @@ -80,7 +79,7 @@ public KinesisIndexTaskIOConfig( @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, @JsonProperty("awsExternalId") String awsExternalId, - @JsonProperty("taskDuration") Duration taskDuration + @JsonProperty("refreshRejectionPeriodsInMinutes") Long refreshRejectionPeriodsInMinutes ) { super( @@ -92,7 +91,7 @@ public KinesisIndexTaskIOConfig( minimumMessageTime, maximumMessageTime, inputFormat, - taskDuration.getStandardMinutes() + refreshRejectionPeriodsInMinutes ); Preconditions.checkArgument( getEndSequenceNumbers().getPartitionSequenceNumberMap() @@ -121,7 +120,7 @@ public KinesisIndexTaskIOConfig( Integer fetchDelayMillis, String awsAssumedRoleArn, String awsExternalId, - Duration taskDuration + Long refreshRejectionPeriodsInMinutes ) { this( @@ -140,7 +139,7 @@ public KinesisIndexTaskIOConfig( fetchDelayMillis, awsAssumedRoleArn, awsExternalId, - taskDuration + refreshRejectionPeriodsInMinutes ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 16ff751a4891..2391b1265bab 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -146,7 +146,7 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( ioConfig.getFetchDelayMillis(), ioConfig.getAwsAssumedRoleArn(), ioConfig.getAwsExternalId(), - ioConfig.getTaskDuration() + ioConfig.getTaskDuration().getStandardMinutes() ); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java index a917d9a43a9c..000be2830d6e 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java @@ -269,7 +269,7 @@ public void testDeserializeToOldIoConfig() throws IOException 2000, "awsAssumedRoleArn", "awsExternalId", - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ); final byte[] json = mapper.writeValueAsBytes(currentConfig); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index 5648255deddf..ea4431c212dc 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -96,7 +96,7 @@ public class KinesisIndexTaskSerdeTest null, null, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ); private static final String ACCESS_KEY = "test-access-key"; private static final String SECRET_KEY = "test-secret-key"; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 757f6735a700..ae7f3b59a081 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -787,7 +787,7 @@ public void testRunWithMinimumMessageTime() throws Exception null, null, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -850,7 +850,7 @@ public void testRunWithMaximumMessageTime() throws Exception null, null, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ) ); @@ -1950,7 +1950,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception null, null, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ), context ); @@ -2113,7 +2113,7 @@ public void testSequencesFromContext() throws IOException null, null, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ), context ); @@ -2315,7 +2315,7 @@ private KinesisIndexTask createTask( null, null, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ), null ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index aec9cae152fd..dc10e348138a 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -5565,7 +5565,7 @@ private KinesisIndexTask createKinesisIndexTask( null, null, null, - Duration.standardHours(2) + Duration.standardHours(2).getStandardMinutes() ), Collections.emptyMap(), false, From 0a0434c371015d6dfb5f9ebc24c5c4f04cfab699 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Thu, 14 Nov 2024 01:14:27 -0600 Subject: [PATCH 12/12] comments --- docs/ingestion/supervisor.md | 2 +- .../SeekableStreamIndexTaskRunner.java | 2 ++ .../src/test/resources/log4j2.xml | 27 ------------------- 3 files changed, 3 insertions(+), 28 deletions(-) delete mode 100644 indexing-service/src/test/resources/log4j2.xml diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md index a07558389fae..0abdf7f2c3c2 100644 --- a/docs/ingestion/supervisor.md +++ b/docs/ingestion/supervisor.md @@ -61,7 +61,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka I/O confi |`completionTimeout`|ISO 8601 period|The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|`PT30M`| |`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to `2016-01-01T11:00Z` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.|No|| |`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline. You can specify only one of the late message rejection properties.|No|| -|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause Druid to drop messages unexpectedly whenever a task runs past its originally configured task duration.|No|| +|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover.|No|| #### Task autoscaler diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index cd169864a527..42dcef39bc85 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -2113,6 +2113,8 @@ private void refreshMinMaxMessageTime() { minMessageTime = minMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue()); maxMessageTime = maxMessageTime.plusMinutes(ioConfig.getRefreshRejectionPeriodsInMinutes().intValue()); + + log.info(StringUtils.format("Updated min and max messsage times to %s and %s respectively.", minMessageTime, maxMessageTime)); } public boolean withinMinMaxRecordTime(final InputRow row) diff --git a/indexing-service/src/test/resources/log4j2.xml b/indexing-service/src/test/resources/log4j2.xml deleted file mode 100644 index ca81b189289d..000000000000 --- a/indexing-service/src/test/resources/log4j2.xml +++ /dev/null @@ -1,27 +0,0 @@ - - - - - - - - - -