diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 66339a31297a..703106611ba7 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -51,7 +51,6 @@ import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; -import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.task.AbstractTask; @@ -73,6 +72,7 @@ import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import io.druid.segment.realtime.firehose.ChatHandler; import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.plumber.RejectionPolicy; import io.druid.timeline.DataSegment; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -142,6 +142,7 @@ public enum Status private volatile Thread runThread = null; private volatile boolean stopRequested = false; private volatile boolean publishOnStop = false; + private volatile RejectionPolicy rejectionPolicy; // The pause lock and associated conditions are to support coordination between the Jetty threads and the main // ingestion loop. The goal is to provide callers of the API a guarantee that if pause() returns successfully @@ -191,6 +192,7 @@ public KafkaIndexTask( this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap()); + rejectionPolicy = tuningConfig.getRejectionPolicyFactory().create(tuningConfig.getWindowPeriod()); } private static String makeTaskId(String dataSource, int randomBits) @@ -411,20 +413,25 @@ public boolean apply(Throwable input) try { final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row"); - final SegmentIdentifier identifier = driver.add( - row, - sequenceNames.get(record.partition()), - committerSupplier - ); - if (identifier == null) { - // Failure to allocate segment puts determinism at risk, bail out to be safe. - // May want configurable behavior here at some point. - // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. - throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); - } + if (rejectionPolicy.accept(row.getTimestampFromEpoch())) { + final SegmentIdentifier identifier = driver.add( + row, + sequenceNames.get(record.partition()), + committerSupplier + ); - fireDepartmentMetrics.incrementProcessed(); + if (identifier == null) { + // Failure to allocate segment puts determinism at risk, bail out to be safe. + // May want configurable behavior here at some point. + // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. + throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); + } + + fireDepartmentMetrics.incrementProcessed(); + } else { + fireDepartmentMetrics.incrementThrownAway(); + } } catch (ParseException e) { if (tuningConfig.isReportParseExceptions()) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index 374b2dec909c..f42691663346 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -25,6 +25,8 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.TuningConfig; import io.druid.segment.realtime.appenderator.AppenderatorConfig; +import io.druid.segment.realtime.plumber.NoopRejectionPolicyFactory; +import io.druid.segment.realtime.plumber.RejectionPolicyFactory; import org.joda.time.Period; import java.io.File; @@ -42,6 +44,8 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig private final boolean buildV9Directly; private final boolean reportParseExceptions; private final long handoffConditionTimeout; + private final RejectionPolicyFactory rejectionPolicyFactory; + private final Period windowPeriod; @JsonCreator public KafkaTuningConfig( @@ -53,7 +57,9 @@ public KafkaTuningConfig( @JsonProperty("indexSpec") IndexSpec indexSpec, @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, - @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout + @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, + @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory, + @JsonProperty("windowPeriod") Period windowPeriod ) { // Cannot be a static because default basePersistDirectory is unique per-instance @@ -74,6 +80,10 @@ public KafkaTuningConfig( this.handoffConditionTimeout = handoffConditionTimeout == null ? defaults.getHandoffConditionTimeout() : handoffConditionTimeout; + this.rejectionPolicyFactory = rejectionPolicyFactory == null + ? new NoopRejectionPolicyFactory() + : rejectionPolicyFactory; + this.windowPeriod = windowPeriod == null ? new Period("P1D") : windowPeriod; } @JsonProperty @@ -130,6 +140,18 @@ public long getHandoffConditionTimeout() return handoffConditionTimeout; } + @JsonProperty("rejectionPolicy") + public RejectionPolicyFactory getRejectionPolicyFactory() + { + return rejectionPolicyFactory; + } + + @JsonProperty + public Period getWindowPeriod() + { + return windowPeriod; + } + public KafkaTuningConfig withBasePersistDirectory(File dir) { return new KafkaTuningConfig( @@ -141,7 +163,9 @@ public KafkaTuningConfig withBasePersistDirectory(File dir) indexSpec, buildV9Directly, reportParseExceptions, - handoffConditionTimeout + handoffConditionTimeout, + rejectionPolicyFactory, + windowPeriod ); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 016d924337a7..cd115effa12a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import io.druid.guice.annotations.Json; -import io.druid.indexing.kafka.KafkaIndexTaskClient; import io.druid.indexing.kafka.KafkaIndexTaskClientFactory; import io.druid.indexing.kafka.KafkaTuningConfig; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -62,7 +61,7 @@ public KafkaSupervisorSpec( this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema"); this.tuningConfig = tuningConfig != null ? tuningConfig - : new KafkaTuningConfig(null, null, null, null, null, null, null, null, null); + : new KafkaTuningConfig(null, null, null, null, null, null, null, null, null, null, null); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); this.taskStorage = taskStorage; diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 8f51b68ddb80..4a5656e5c827 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1192,7 +1192,9 @@ private KafkaIndexTask createTask( null, buildV9Directly, reportParseExceptions, - handoffConditionTimeout + handoffConditionTimeout, + null, + null ); return new KafkaIndexTask( taskId, diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java new file mode 100644 index 000000000000..2528e998f129 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -0,0 +1,114 @@ +/* + * + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * / + * + */ + +package io.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.segment.IndexSpec; +import io.druid.segment.indexing.TuningConfig; +import io.druid.segment.realtime.plumber.NoopRejectionPolicyFactory; +import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; + +public class KafkaTuningConfigTest +{ + private final ObjectMapper mapper; + + public KafkaTuningConfigTest() + { + mapper = new DefaultObjectMapper(); + mapper.registerModules((Iterable) new KafkaIndexTaskModule().getJacksonModules()); + } + + @Test + public void testSerdeWithDefaults() throws Exception + { + String jsonStr = "{\"type\": \"kafka\"}"; + + KafkaTuningConfig config = (KafkaTuningConfig) mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + TuningConfig.class + ) + ), + TuningConfig.class + ); + + Assert.assertNotNull(config.getBasePersistDirectory()); + Assert.assertEquals(75000, config.getMaxRowsInMemory()); + Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment()); + Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); + Assert.assertEquals(0, config.getMaxPendingPersists()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(false, config.getBuildV9Directly()); + Assert.assertEquals(false, config.isReportParseExceptions()); + Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertTrue(config.getRejectionPolicyFactory() instanceof NoopRejectionPolicyFactory); + Assert.assertEquals(new Period("P1D"), config.getWindowPeriod()); + } + + @Test + public void testSerdeWithNonDefaults() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"basePersistDirectory\": \"/tmp/xxx\",\n" + + " \"maxRowsInMemory\": 100,\n" + + " \"maxRowsPerSegment\": 100,\n" + + " \"intermediatePersistPeriod\": \"PT1H\",\n" + + " \"maxPendingPersists\": 100,\n" + + " \"buildV9Directly\": true,\n" + + " \"reportParseExceptions\": true,\n" + + " \"handoffConditionTimeout\": 100,\n" + + " \"rejectionPolicy\": {\"type\": \"serverTime\"},\n" + + " \"windowPeriod\": \"PT1H\"\n" + + "}"; + + KafkaTuningConfig config = (KafkaTuningConfig) mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + TuningConfig.class + ) + ), + TuningConfig.class + ); + + Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory()); + Assert.assertEquals(100, config.getMaxRowsInMemory()); + Assert.assertEquals(100, config.getMaxRowsPerSegment()); + Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); + Assert.assertEquals(100, config.getMaxPendingPersists()); + Assert.assertEquals(true, config.getBuildV9Directly()); + Assert.assertEquals(true, config.isReportParseExceptions()); + Assert.assertEquals(100, config.getHandoffConditionTimeout()); + Assert.assertTrue(config.getRejectionPolicyFactory() instanceof ServerTimeRejectionPolicyFactory); + Assert.assertEquals(new Period("PT1H"), config.getWindowPeriod()); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 31251c4944c2..0f83b08297a9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -162,6 +162,8 @@ public void setUp() throws Exception null, true, false, + null, + null, null ); }