From b224eebed34bb7240614e0c0afdcbbbdf4890e43 Mon Sep 17 00:00:00 2001 From: dclim Date: Fri, 27 May 2016 15:16:40 -0600 Subject: [PATCH 1/2] add KafkaIndexTask support for minimumMessageTime --- .../druid/indexing/kafka/KafkaIOConfig.java | 14 ++- .../druid/indexing/kafka/KafkaIndexTask.java | 32 +++-- .../kafka/supervisor/KafkaSupervisor.java | 3 +- .../indexing/kafka/KafkaIndexTaskTest.java | 111 +++++++++++++++--- .../indexing/kafka/KafkaTuningConfigTest.java | 104 ++++++++++++++++ .../kafka/supervisor/KafkaSupervisorTest.java | 3 +- 6 files changed, 232 insertions(+), 35 deletions(-) create mode 100644 extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index d1ec3df3d483..6307d77ed12b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -21,8 +21,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import io.druid.segment.indexing.IOConfig; +import org.joda.time.DateTime; import java.util.Map; @@ -37,6 +39,7 @@ public class KafkaIOConfig implements IOConfig private final Map consumerProperties; private final boolean useTransaction; private final boolean pauseAfterRead; + private final Optional minimumMessageTime; @JsonCreator public KafkaIOConfig( @@ -45,7 +48,8 @@ public KafkaIOConfig( @JsonProperty("endPartitions") KafkaPartitions endPartitions, @JsonProperty("consumerProperties") Map consumerProperties, @JsonProperty("useTransaction") Boolean useTransaction, - @JsonProperty("pauseAfterRead") Boolean pauseAfterRead + @JsonProperty("pauseAfterRead") Boolean pauseAfterRead, + @JsonProperty("minimumMessageTime") DateTime minimumMessageTime ) { this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); @@ -54,6 +58,7 @@ public KafkaIOConfig( this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ; + this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); Preconditions.checkArgument( startPartitions.getTopic().equals(endPartitions.getTopic()), @@ -111,6 +116,12 @@ public boolean isPauseAfterRead() return pauseAfterRead; } + @JsonProperty + public Optional getMinimumMessageTime() + { + return minimumMessageTime; + } + @Override public String toString() { @@ -121,6 +132,7 @@ public String toString() ", consumerProperties=" + consumerProperties + ", useTransaction=" + useTransaction + ", pauseAfterRead=" + pauseAfterRead + + ", minimumMessageTime=" + minimumMessageTime + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 66339a31297a..4eaca8ea6ecd 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -51,7 +51,6 @@ import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; -import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.task.AbstractTask; @@ -411,20 +410,27 @@ public boolean apply(Throwable input) try { final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row"); - final SegmentIdentifier identifier = driver.add( - row, - sequenceNames.get(record.partition()), - committerSupplier - ); - if (identifier == null) { - // Failure to allocate segment puts determinism at risk, bail out to be safe. - // May want configurable behavior here at some point. - // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. - throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); - } + if (!ioConfig.getMinimumMessageTime().isPresent() || + !ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp())) { + + final SegmentIdentifier identifier = driver.add( + row, + sequenceNames.get(record.partition()), + committerSupplier + ); - fireDepartmentMetrics.incrementProcessed(); + if (identifier == null) { + // Failure to allocate segment puts determinism at risk, bail out to be safe. + // May want configurable behavior here at some point. + // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. + throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); + } + + fireDepartmentMetrics.incrementProcessed(); + } else { + fireDepartmentMetrics.incrementThrownAway(); + } } catch (ParseException e) { if (tuningConfig.isReportParseExceptions()) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 5daca56b2e31..1bb6c8d0c56a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -978,7 +978,8 @@ private void createKafkaTasksForGroup(int groupId, int replicas) new KafkaPartitions(ioConfig.getTopic(), endPartitions), consumerProperties, true, - false + false, + null ); for (int i = 0; i < replicas; i++) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 8f51b68ddb80..4f7f57131f86 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -117,6 +117,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.easymock.EasyMock; +import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.After; @@ -301,7 +302,8 @@ public void testRunAfterDataInserted() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -341,7 +343,8 @@ public void testRunBeforeDataInserted() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -382,6 +385,59 @@ public void testRunBeforeDataInserted() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); } + @Test(timeout = 60_000L) + public void testRunWithMinimumMessageTime() throws Exception + { + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 0L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true, + false, + new DateTime("2010") + ), + null + ); + + final ListenableFuture future = runTask(task); + + // Wait for the task to start reading + while (task.getStatus() != KafkaIndexTask.Status.READING) { + Thread.sleep(10); + } + + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + // Wait for task to exit + Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(2, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + } + @Test(timeout = 60_000L) public void testRunOnNothing() throws Exception { @@ -400,7 +456,8 @@ public void testRunOnNothing() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -439,7 +496,8 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -489,7 +547,8 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -538,7 +597,8 @@ public void testReportParseExceptions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -569,7 +629,8 @@ public void testRunReplicas() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -581,7 +642,8 @@ public void testRunReplicas() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -633,7 +695,8 @@ public void testRunConflicting() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -645,7 +708,8 @@ public void testRunConflicting() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -698,7 +762,8 @@ public void testRunConflictingWithoutTransactions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), false, - false + false, + null ), null ); @@ -710,7 +775,8 @@ public void testRunConflictingWithoutTransactions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), kafkaServer.consumerProperties(), false, - false + false, + null ), null ); @@ -768,7 +834,8 @@ public void testRunOneTaskTwoPartitions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 2L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -823,7 +890,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -835,7 +903,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(1, 1L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -889,7 +958,8 @@ public void testRestore() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -922,7 +992,8 @@ public void testRestore() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -972,7 +1043,8 @@ public void testRunWithPauseAndResume() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false + false, + null ), null ); @@ -1053,7 +1125,8 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), kafkaServer.consumerProperties(), true, - true + true, + null ), null ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java new file mode 100644 index 000000000000..a77cf44a299d --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.segment.IndexSpec; +import io.druid.segment.indexing.TuningConfig; +import org.joda.time.DateTime; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; + +public class KafkaTuningConfigTest +{ + private final ObjectMapper mapper; + + public KafkaTuningConfigTest() + { + mapper = new DefaultObjectMapper(); + mapper.registerModules((Iterable) new KafkaIndexTaskModule().getJacksonModules()); + } + + @Test + public void testSerdeWithDefaults() throws Exception + { + String jsonStr = "{\"type\": \"kafka\"}"; + + KafkaTuningConfig config = (KafkaTuningConfig) mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + TuningConfig.class + ) + ), + TuningConfig.class + ); + + Assert.assertNotNull(config.getBasePersistDirectory()); + Assert.assertEquals(75000, config.getMaxRowsInMemory()); + Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment()); + Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); + Assert.assertEquals(0, config.getMaxPendingPersists()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(false, config.getBuildV9Directly()); + Assert.assertEquals(false, config.isReportParseExceptions()); + Assert.assertEquals(0, config.getHandoffConditionTimeout()); + } + + @Test + public void testSerdeWithNonDefaults() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"basePersistDirectory\": \"/tmp/xxx\",\n" + + " \"maxRowsInMemory\": 100,\n" + + " \"maxRowsPerSegment\": 100,\n" + + " \"intermediatePersistPeriod\": \"PT1H\",\n" + + " \"maxPendingPersists\": 100,\n" + + " \"buildV9Directly\": true,\n" + + " \"reportParseExceptions\": true,\n" + + " \"handoffConditionTimeout\": 100\n" + + "}"; + + KafkaTuningConfig config = (KafkaTuningConfig) mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + TuningConfig.class + ) + ), + TuningConfig.class + ); + + Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory()); + Assert.assertEquals(100, config.getMaxRowsInMemory()); + Assert.assertEquals(100, config.getMaxRowsPerSegment()); + Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); + Assert.assertEquals(100, config.getMaxPendingPersists()); + Assert.assertEquals(true, config.getBuildV9Directly()); + Assert.assertEquals(true, config.isReportParseExceptions()); + Assert.assertEquals(100, config.getHandoffConditionTimeout()); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 31251c4944c2..62b9d193d7d9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -1063,7 +1063,8 @@ private KafkaIndexTask createKafkaIndexTask( endPartitions, ImmutableMap.of(), true, - false + false, + null ), ImmutableMap.of(), null From 213ee95a715f47a8d8f461dd937d85b0d4205009 Mon Sep 17 00:00:00 2001 From: dclim Date: Fri, 27 May 2016 18:43:42 -0600 Subject: [PATCH 2/2] add Kafka supervisor support for lateMessageRejectionPeriod --- .../extensions-core/kafka-ingestion.md | 1 + .../kafka/supervisor/KafkaSupervisor.java | 46 +++- .../supervisor/KafkaSupervisorIOConfig.java | 14 +- .../indexing/kafka/KafkaIOConfigTest.java | 248 ++++++++++++++++++ .../KafkaSupervisorIOConfigTest.java | 158 +++++++++++ .../kafka/supervisor/KafkaSupervisorTest.java | 196 +++++++++++--- 6 files changed, 620 insertions(+), 43 deletions(-) create mode 100644 extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java create mode 100644 extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 5b8e69551063..7b782a6bbf8f 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -139,6 +139,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`period`|ISO8601 Period|How often the supervisor will execute its management logic. Note that the supervisor will also run in response to certain events (such as tasks succeeding, failing, and reaching their taskDuration) so this value specifies the maximum time between iterations.|no (default == PT30S)| |`useEarliestOffset`|Boolean|If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.|no (default == false)| |`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)| +|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)| ## Supervisor API diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 1bb6c8d0c56a..2fb2ce1dd13f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -37,8 +37,8 @@ import com.metamx.common.ISE; import com.metamx.emitter.EmittingLogger; import io.druid.concurrent.Execs; -import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskInfoProvider; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; @@ -113,11 +113,13 @@ private class TaskGroup final Map partitionOffsets; final Map tasks = new HashMap<>(); + final Optional minimumMessageTime; DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action - public TaskGroup(Map partitionOffsets) + public TaskGroup(Map partitionOffsets, Optional minimumMessageTime) { this.partitionOffsets = partitionOffsets; + this.minimumMessageTime = minimumMessageTime; } } @@ -450,6 +452,9 @@ String generateSequenceName(int groupId) } String partitionOffsetStr = sb.toString().substring(1); + Optional minimumMessageTime = taskGroups.get(groupId).minimumMessageTime; + String minMsgTimeStr = (minimumMessageTime.isPresent() ? String.valueOf(minimumMessageTime.get().getMillis()) : ""); + String dataSchema, tuningConfig; try { dataSchema = sortingMapper.writeValueAsString(spec.getDataSchema()); @@ -459,7 +464,8 @@ String generateSequenceName(int groupId) throw Throwables.propagate(e); } - String hashCode = DigestUtils.sha1Hex(dataSchema + tuningConfig + partitionOffsetStr).substring(0, 15); + String hashCode = DigestUtils.sha1Hex(dataSchema + tuningConfig + partitionOffsetStr + minMsgTimeStr) + .substring(0, 15); return Joiner.on("_").join("index_kafka", dataSource, hashCode); } @@ -599,11 +605,19 @@ private void discoverTasks() log.debug("Creating new task group [%d]", taskGroupId); taskGroups.put( taskGroupId, - new TaskGroup(kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()) + new TaskGroup( + kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap(), + kafkaTask.getIOConfig().getMinimumMessageTime() + ) ); } - taskGroups.get(taskGroupId).tasks.put(taskId, new TaskData()); + if (!isTaskCurrent(taskGroupId, taskId)) { + log.info("Stopping task [%s] which does not match the expected parameters and ingestion spec", taskId); + stopTask(taskId, false); + } else { + taskGroups.get(taskGroupId).tasks.put(taskId, new TaskData()); + } } } } @@ -635,7 +649,11 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups( } log.info("Creating new pending completion task group for discovered task [%s]", taskId); - TaskGroup newTaskGroup = new TaskGroup(startingPartitions); + + // reading the minimumMessageTime from the publishing task and setting it here is not necessary as this task cannot + // change to a state where it will read any more events + TaskGroup newTaskGroup = new TaskGroup(startingPartitions, Optional.absent()); + newTaskGroup.tasks.put(taskId, new TaskData()); newTaskGroup.completionTimeout = DateTime.now().plus(ioConfig.getCompletionTimeout()); @@ -889,7 +907,8 @@ private void checkCurrentTaskState() TaskGroup taskGroup = taskGroupEntry.getValue(); // Iterate the list of known tasks in this group and: - // 1) Kill any tasks which are not "current" (have the partitions and starting offsets in [taskGroups] + // 1) Kill any tasks which are not "current" (have the partitions, starting offsets, and minimumMessageTime + // (if applicable) in [taskGroups]) // 2) Remove any tasks that have failed from the list // 3) If any task completed successfully, stop all the tasks in this group and move to the next group @@ -933,7 +952,12 @@ void createNewTasks() for (Integer groupId : partitionGroups.keySet()) { if (!taskGroups.containsKey(groupId)) { log.info("Creating new task group [%d] for partitions %s", groupId, partitionGroups.get(groupId).keySet()); - taskGroups.put(groupId, new TaskGroup(generateStartingOffsetsForPartitionGroup(groupId))); + + Optional minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of( + DateTime.now().minus(ioConfig.getLateMessageRejectionPeriod().get()) + ) : Optional.absent()); + + taskGroups.put(groupId, new TaskGroup(generateStartingOffsetsForPartitionGroup(groupId), minimumMessageTime)); } } @@ -971,6 +995,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) String sequenceName = generateSequenceName(groupId); Map consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties()); + DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull(); KafkaIOConfig kafkaIOConfig = new KafkaIOConfig( sequenceName, @@ -979,7 +1004,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) consumerProperties, true, false, - null + minimumMessageTime ); for (int i = 0; i < replicas; i++) { @@ -1099,7 +1124,8 @@ private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOf /** * Compares the sequence name from the task with one generated for the task's group ID and returns false if they do - * not match. The sequence name is generated from a hash of the dataSchema, tuningConfig, and starting offsets. + * not match. The sequence name is generated from a hash of the dataSchema, tuningConfig, starting offsets, and the + * minimumMessageTime if set. */ private boolean isTaskCurrent(int taskGroupId, String taskId) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index b53427d1506b..5833cd6482db 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.joda.time.Duration; import org.joda.time.Period; @@ -40,6 +41,7 @@ public class KafkaSupervisorIOConfig private final Duration period; private final Boolean useEarliestOffset; private final Duration completionTimeout; + private final Optional lateMessageRejectionPeriod; @JsonCreator public KafkaSupervisorIOConfig( @@ -51,7 +53,8 @@ public KafkaSupervisorIOConfig( @JsonProperty("startDelay") Period startDelay, @JsonProperty("period") Period period, @JsonProperty("useEarliestOffset") Boolean useEarliestOffset, - @JsonProperty("completionTimeout") Period completionTimeout + @JsonProperty("completionTimeout") Period completionTimeout, + @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod ) { this.topic = Preconditions.checkNotNull(topic, "topic"); @@ -68,6 +71,9 @@ public KafkaSupervisorIOConfig( this.period = defaultDuration(period, "PT30S"); this.useEarliestOffset = (useEarliestOffset != null ? useEarliestOffset : false); this.completionTimeout = defaultDuration(completionTimeout, "PT30M"); + this.lateMessageRejectionPeriod = (lateMessageRejectionPeriod == null + ? Optional.absent() + : Optional.of(lateMessageRejectionPeriod.toStandardDuration())); } @JsonProperty @@ -124,6 +130,12 @@ public Duration getCompletionTimeout() return completionTimeout; } + @JsonProperty + public Optional getLateMessageRejectionPeriod() + { + return lateMessageRejectionPeriod; + } + private static Duration defaultDuration(final Period period, final String theDefault) { return (period == null ? new Period(theDefault) : period).toStandardDuration(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java new file mode 100644 index 000000000000..de63ab14904e --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka; + +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.segment.indexing.IOConfig; +import org.hamcrest.CoreMatchers; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class KafkaIOConfigTest +{ + private final ObjectMapper mapper; + + public KafkaIOConfigTest() + { + mapper = new DefaultObjectMapper(); + mapper.registerModules((Iterable) new KafkaIndexTaskModule().getJacksonModules()); + } + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Test + public void testSerdeWithDefaults() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"baseSequenceName\": \"my-sequence-name\",\n" + + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n" + + "}"; + + KafkaIOConfig config = (KafkaIOConfig) mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + IOConfig.class + ) + ), IOConfig.class + ); + + Assert.assertEquals("my-sequence-name", config.getBaseSequenceName()); + Assert.assertEquals("mytopic", config.getStartPartitions().getTopic()); + Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 10L), config.getStartPartitions().getPartitionOffsetMap()); + Assert.assertEquals("mytopic", config.getEndPartitions().getTopic()); + Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap()); + Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); + Assert.assertEquals(true, config.isUseTransaction()); + Assert.assertEquals(false, config.isPauseAfterRead()); + Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent()); + } + + @Test + public void testSerdeWithNonDefaults() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"baseSequenceName\": \"my-sequence-name\",\n" + + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"useTransaction\": false,\n" + + " \"pauseAfterRead\": true,\n" + + " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n" + + "}"; + + KafkaIOConfig config = (KafkaIOConfig) mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + IOConfig.class + ) + ), IOConfig.class + ); + + Assert.assertEquals("my-sequence-name", config.getBaseSequenceName()); + Assert.assertEquals("mytopic", config.getStartPartitions().getTopic()); + Assert.assertEquals(ImmutableMap.of(0, 1L, 1, 10L), config.getStartPartitions().getPartitionOffsetMap()); + Assert.assertEquals("mytopic", config.getEndPartitions().getTopic()); + Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap()); + Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); + Assert.assertEquals(false, config.isUseTransaction()); + Assert.assertEquals(true, config.isPauseAfterRead()); + Assert.assertEquals(new DateTime("2016-05-31T12:00Z"), config.getMinimumMessageTime().get()); + } + + @Test + public void testBaseSequenceNameRequired() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"useTransaction\": false,\n" + + " \"pauseAfterRead\": true,\n" + + " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n" + + "}"; + + exception.expect(JsonMappingException.class); + exception.expectCause(CoreMatchers.isA(NullPointerException.class)); + exception.expectMessage(CoreMatchers.containsString("baseSequenceName")); + mapper.readValue(jsonStr, IOConfig.class); + } + + @Test + public void testStartPartitionsRequired() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"baseSequenceName\": \"my-sequence-name\",\n" + + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"useTransaction\": false,\n" + + " \"pauseAfterRead\": true,\n" + + " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n" + + "}"; + + exception.expect(JsonMappingException.class); + exception.expectCause(CoreMatchers.isA(NullPointerException.class)); + exception.expectMessage(CoreMatchers.containsString("startPartitions")); + mapper.readValue(jsonStr, IOConfig.class); + } + + @Test + public void testEndPartitionsRequired() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"baseSequenceName\": \"my-sequence-name\",\n" + + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"useTransaction\": false,\n" + + " \"pauseAfterRead\": true,\n" + + " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n" + + "}"; + + exception.expect(JsonMappingException.class); + exception.expectCause(CoreMatchers.isA(NullPointerException.class)); + exception.expectMessage(CoreMatchers.containsString("endPartitions")); + mapper.readValue(jsonStr, IOConfig.class); + } + + @Test + public void testConsumerPropertiesRequired() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"baseSequenceName\": \"my-sequence-name\",\n" + + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + + " \"useTransaction\": false,\n" + + " \"pauseAfterRead\": true,\n" + + " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n" + + "}"; + + exception.expect(JsonMappingException.class); + exception.expectCause(CoreMatchers.isA(NullPointerException.class)); + exception.expectMessage(CoreMatchers.containsString("consumerProperties")); + mapper.readValue(jsonStr, IOConfig.class); + } + + @Test + public void testStartAndEndTopicMatch() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"baseSequenceName\": \"my-sequence-name\",\n" + + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + + " \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"useTransaction\": false,\n" + + " \"pauseAfterRead\": true,\n" + + " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n" + + "}"; + + exception.expect(JsonMappingException.class); + exception.expectCause(CoreMatchers.isA(IllegalArgumentException.class)); + exception.expectMessage(CoreMatchers.containsString("start topic and end topic must match")); + mapper.readValue(jsonStr, IOConfig.class); + } + + @Test + public void testStartAndEndPartitionSetMatch() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"baseSequenceName\": \"my-sequence-name\",\n" + + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"useTransaction\": false,\n" + + " \"pauseAfterRead\": true,\n" + + " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n" + + "}"; + + exception.expect(JsonMappingException.class); + exception.expectCause(CoreMatchers.isA(IllegalArgumentException.class)); + exception.expectMessage(CoreMatchers.containsString("start partition set and end partition set must match")); + mapper.readValue(jsonStr, IOConfig.class); + } + + @Test + public void testEndOffsetGreaterThanStart() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"baseSequenceName\": \"my-sequence-name\",\n" + + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"useTransaction\": false,\n" + + " \"pauseAfterRead\": true,\n" + + " \"minimumMessageTime\": \"2016-05-31T12:00Z\"\n" + + "}"; + + exception.expect(JsonMappingException.class); + exception.expectCause(CoreMatchers.isA(IllegalArgumentException.class)); + exception.expectMessage(CoreMatchers.containsString("end offset must be >= start offset")); + mapper.readValue(jsonStr, IOConfig.class); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java new file mode 100644 index 000000000000..e94f4a740feb --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.kafka.supervisor; + +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.druid.indexing.kafka.KafkaIndexTaskModule; +import io.druid.jackson.DefaultObjectMapper; +import org.hamcrest.CoreMatchers; +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class KafkaSupervisorIOConfigTest +{ + private final ObjectMapper mapper; + + public KafkaSupervisorIOConfigTest() + { + mapper = new DefaultObjectMapper(); + mapper.registerModules((Iterable) new KafkaIndexTaskModule().getJacksonModules()); + } + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Test + public void testSerdeWithDefaults() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n" + + "}"; + + KafkaSupervisorIOConfig config = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + KafkaSupervisorIOConfig.class + ) + ), KafkaSupervisorIOConfig.class + ); + + Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertEquals(1, (int) config.getReplicas()); + Assert.assertEquals(1, (int) config.getTaskCount()); + Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration()); + Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); + Assert.assertEquals(Duration.standardSeconds(5), config.getStartDelay()); + Assert.assertEquals(Duration.standardSeconds(30), config.getPeriod()); + Assert.assertEquals(false, config.isUseEarliestOffset()); + Assert.assertEquals(Duration.standardMinutes(30), config.getCompletionTimeout()); + Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent()); + } + + @Test + public void testSerdeWithNonDefaults() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"replicas\": 3,\n" + + " \"taskCount\": 9,\n" + + " \"taskDuration\": \"PT30M\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"startDelay\": \"PT1M\",\n" + + " \"period\": \"PT10S\",\n" + + " \"useEarliestOffset\": true,\n" + + " \"completionTimeout\": \"PT45M\",\n" + + " \"lateMessageRejectionPeriod\": \"PT1H\"\n" + + "}"; + + KafkaSupervisorIOConfig config = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + KafkaSupervisorIOConfig.class + ) + ), KafkaSupervisorIOConfig.class + ); + + Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertEquals(3, (int) config.getReplicas()); + Assert.assertEquals(9, (int) config.getTaskCount()); + Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration()); + Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); + Assert.assertEquals(Duration.standardMinutes(1), config.getStartDelay()); + Assert.assertEquals(Duration.standardSeconds(10), config.getPeriod()); + Assert.assertEquals(true, config.isUseEarliestOffset()); + Assert.assertEquals(Duration.standardMinutes(45), config.getCompletionTimeout()); + Assert.assertEquals(Duration.standardHours(1), config.getLateMessageRejectionPeriod().get()); + } + + @Test + public void testTopicRequired() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n" + + "}"; + + exception.expect(JsonMappingException.class); + exception.expectCause(CoreMatchers.isA(NullPointerException.class)); + exception.expectMessage(CoreMatchers.containsString("topic")); + mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class); + } + + @Test + public void testConsumerPropertiesRequired() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\"\n" + + "}"; + + exception.expect(JsonMappingException.class); + exception.expectCause(CoreMatchers.isA(NullPointerException.class)); + exception.expectMessage(CoreMatchers.containsString("consumerProperties")); + mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class); + } + + @Test + public void testBootstrapServersRequired() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"consumerProperties\": {}\n" + + "}"; + + exception.expect(JsonMappingException.class); + exception.expectCause(CoreMatchers.isA(NullPointerException.class)); + exception.expectMessage(CoreMatchers.containsString("bootstrap.servers")); + mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 62b9d193d7d9..f1c29aeb6d03 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -36,8 +36,8 @@ import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularities; -import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskInfoProvider; +import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.Task; @@ -181,7 +181,7 @@ public void tearDown() throws Exception @Test public void testNoInitialState() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H"); + supervisor = getSupervisor(1, 1, true, "PT1H", null); addSomeEvents(1); Capture captured = Capture.newInstance(); @@ -211,6 +211,7 @@ public void testNoInitialState() throws Exception Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead()); + Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); Assert.assertEquals(KAFKA_TOPIC, taskConfig.getStartPartitions().getTopic()); Assert.assertEquals(0L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0)); @@ -226,7 +227,7 @@ public void testNoInitialState() throws Exception @Test public void testMultiTask() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H"); + supervisor = getSupervisor(1, 2, true, "PT1H", null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -263,7 +264,7 @@ public void testMultiTask() throws Exception @Test public void testReplicas() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H"); + supervisor = getSupervisor(2, 1, true, "PT1H", null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -297,13 +298,52 @@ public void testReplicas() throws Exception Assert.assertEquals(0L, (long) task2.getIOConfig().getStartPartitions().getPartitionOffsetMap().get(2)); } + @Test + public void testLateMessageRejectionPeriod() throws Exception + { + supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H")); + addSomeEvents(1); + + Capture captured = Capture.newInstance(CaptureType.ALL); + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true).times(2); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + KafkaIndexTask task1 = captured.getValues().get(0); + KafkaIndexTask task2 = captured.getValues().get(1); + + Assert.assertTrue( + "minimumMessageTime", + task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(59).isBeforeNow() + ); + Assert.assertTrue( + "minimumMessageTime", + task1.getIOConfig().getMinimumMessageTime().get().plusMinutes(61).isAfterNow() + ); + Assert.assertEquals( + task1.getIOConfig().getMinimumMessageTime().get(), + task2.getIOConfig().getMinimumMessageTime().get() + ); + } + @Test /** * Test generating the starting offsets from the partition high water marks in Kafka. */ public void testLatestOffset() throws Exception { - supervisor = getSupervisor(1, 1, false, "PT1H"); + supervisor = getSupervisor(1, 1, false, "PT1H", null); addSomeEvents(1100); Capture captured = Capture.newInstance(); @@ -335,7 +375,7 @@ public void testLatestOffset() throws Exception */ public void testDatasourceMetadata() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H"); + supervisor = getSupervisor(1, 1, true, "PT1H", null); addSomeEvents(100); Capture captured = Capture.newInstance(); @@ -365,7 +405,7 @@ public void testDatasourceMetadata() throws Exception @Test(expected = ISE.class) public void testBadMetadataOffsets() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H"); + supervisor = getSupervisor(1, 1, true, "PT1H", null); addSomeEvents(1); expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); @@ -384,7 +424,7 @@ public void testBadMetadataOffsets() throws Exception @Test public void testKillIncompatibleTasks() throws Exception { - supervisor = getSupervisor(2, 1, true, "PT1H"); + supervisor = getSupervisor(2, 1, true, "PT1H", null); addSomeEvents(1); Task id1 = createKafkaIndexTask( // unexpected # of partitions (kill) @@ -392,7 +432,8 @@ public void testKillIncompatibleTasks() throws Exception DATASOURCE, "index_kafka_testDS__some_other_sequenceName", new KafkaPartitions("topic", ImmutableMap.of(0, 0L)), - new KafkaPartitions("topic", ImmutableMap.of(0, 10L)) + new KafkaPartitions("topic", ImmutableMap.of(0, 10L)), + null ); Task id2 = createKafkaIndexTask( // correct number of partitions and ranges (don't kill) @@ -400,7 +441,8 @@ public void testKillIncompatibleTasks() throws Exception DATASOURCE, "sequenceName-0", new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), - new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L)) + new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L)), + null ); Task id3 = createKafkaIndexTask( // unexpected range on partition 2 (kill) @@ -408,7 +450,8 @@ public void testKillIncompatibleTasks() throws Exception DATASOURCE, "index_kafka_testDS__some_other_sequenceName", new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L)), - new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L)) + new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L)), + null ); Task id4 = createKafkaIndexTask( // different datasource (don't kill) @@ -416,7 +459,8 @@ public void testKillIncompatibleTasks() throws Exception "other-datasource", "index_kafka_testDS_d927edff33c4b3f", new KafkaPartitions("topic", ImmutableMap.of(0, 0L)), - new KafkaPartitions("topic", ImmutableMap.of(0, 10L)) + new KafkaPartitions("topic", ImmutableMap.of(0, 10L)), + null ); Task id5 = new RealtimeIndexTask( // non KafkaIndexTask (don't kill) @@ -465,7 +509,7 @@ public void testKillIncompatibleTasks() throws Exception @Test public void testKillBadPartitionAssignment() throws Exception { - supervisor = getSupervisor(1, 2, true, "PT1H"); + supervisor = getSupervisor(1, 2, true, "PT1H", null); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -473,35 +517,40 @@ public void testKillBadPartitionAssignment() throws Exception DATASOURCE, "sequenceName-0", new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)), - new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)) + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null ); Task id2 = createKafkaIndexTask( "id2", DATASOURCE, "sequenceName-1", new KafkaPartitions("topic", ImmutableMap.of(1, 0L)), - new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)) + new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)), + null ); Task id3 = createKafkaIndexTask( "id3", DATASOURCE, "sequenceName-0", new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), - new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)) + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null ); Task id4 = createKafkaIndexTask( "id4", DATASOURCE, "sequenceName-0", new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L)), - new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE)) + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE)), + null ); Task id5 = createKafkaIndexTask( "id5", DATASOURCE, "sequenceName-0", new KafkaPartitions("topic", ImmutableMap.of(1, 0L, 2, 0L)), - new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, Long.MAX_VALUE)) + new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null ); List existingTasks = ImmutableList.of(id1, id2, id3, id4, id5); @@ -542,7 +591,7 @@ public void testKillBadPartitionAssignment() throws Exception @Test public void testRequeueTaskWhenFailed() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H"); + supervisor = getSupervisor(2, 2, true, "PT1H", null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -608,10 +657,82 @@ public void testRequeueTaskWhenFailed() throws Exception ); } + @Test + public void testRequeueAdoptedTaskWhenFailed() throws Exception + { + supervisor = getSupervisor(2, 1, true, "PT1H", null); + addSomeEvents(1); + + DateTime now = DateTime.now(); + Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + "sequenceName-0", + new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + now + ); + + List existingTasks = ImmutableList.of(id1); + + Capture captured = Capture.newInstance(); + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskClient.getStartTime(anyString(), eq(false))).andThrow(taskClient.new NoTaskLocationException("test-id")) + .anyTimes(); + expect(taskQueue.add(capture(captured))).andReturn(true); + expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + // check that replica tasks are created with the same minimumMessageTime as tasks inherited from another supervisor + Assert.assertEquals(now, ((KafkaIndexTask) captured.getValue()).getIOConfig().getMinimumMessageTime().get()); + + // test that a task failing causes a new task to be re-queued with the same parameters + String runningTaskId = captured.getValue().getId(); + Capture aNewTaskCapture = Capture.newInstance(); + KafkaIndexTask iHaveFailed = (KafkaIndexTask) existingTasks.get(0); + reset(taskStorage); + reset(taskQueue); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); + expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId()))); + expect(taskStorage.getStatus(runningTaskId)).andReturn(Optional.of(TaskStatus.running(runningTaskId))).anyTimes(); + expect(taskStorage.getTask(iHaveFailed.getId())).andReturn(Optional.of((Task) iHaveFailed)).anyTimes(); + expect(taskStorage.getTask(runningTaskId)).andReturn(Optional.of(captured.getValue())).anyTimes(); + expect(taskQueue.add(capture(aNewTaskCapture))).andReturn(true); + replay(taskStorage); + replay(taskQueue); + + supervisor.runInternal(); + verifyAll(); + + Assert.assertNotEquals(iHaveFailed.getId(), aNewTaskCapture.getValue().getId()); + Assert.assertEquals( + iHaveFailed.getIOConfig().getBaseSequenceName(), + ((KafkaIndexTask) aNewTaskCapture.getValue()).getIOConfig().getBaseSequenceName() + ); + + // check that failed tasks are recreated with the same minimumMessageTime as the task it replaced, even if that + // task came from another supervisor + Assert.assertEquals(now, ((KafkaIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime().get()); + } + @Test public void testQueueNextTasksOnSuccess() throws Exception { - supervisor = getSupervisor(2, 2, true, "PT1H"); + supervisor = getSupervisor(2, 2, true, "PT1H", null); addSomeEvents(1); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -680,7 +801,7 @@ public void testBeginPublishAndQueueNextTasks() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234); - supervisor = getSupervisor(2, 2, true, "PT1M"); + supervisor = getSupervisor(2, 2, true, "PT1M", null); addSomeEvents(100); Capture captured = Capture.newInstance(CaptureType.ALL); @@ -762,7 +883,7 @@ public void testDiscoverExistingPublishingTask() throws Exception { final TaskLocation location = new TaskLocation("testHost", 1234); - supervisor = getSupervisor(1, 1, true, "PT1H"); + supervisor = getSupervisor(1, 1, true, "PT1H", null); addSomeEvents(1); Task task = createKafkaIndexTask( @@ -770,7 +891,8 @@ public void testDiscoverExistingPublishingTask() throws Exception DATASOURCE, "sequenceName-0", new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), - new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)) + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null ); Collection workItems = new ArrayList<>(); @@ -851,7 +973,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception final TaskLocation location2 = new TaskLocation("testHost2", 145); final DateTime startTime = new DateTime(); - supervisor = getSupervisor(1, 1, true, "PT1H"); + supervisor = getSupervisor(1, 1, true, "PT1H", null); addSomeEvents(1); Task id1 = createKafkaIndexTask( @@ -859,7 +981,8 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception DATASOURCE, "sequenceName-0", new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), - new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)) + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null ); Task id2 = createKafkaIndexTask( @@ -867,7 +990,8 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception DATASOURCE, "sequenceName-0", new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), - new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)) + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null ); Collection workItems = new ArrayList<>(); @@ -932,7 +1056,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception @Test(expected = IllegalStateException.class) public void testStopNotStarted() throws Exception { - supervisor = getSupervisor(1, 1, true, "PT1H"); + supervisor = getSupervisor(1, 1, true, "PT1H", null); supervisor.stop(false); } @@ -943,7 +1067,7 @@ public void testStop() throws Exception taskRunner.unregisterListener(String.format("KafkaSupervisor-%s", DATASOURCE)); replayAll(); - supervisor = getSupervisor(1, 1, true, "PT1H"); + supervisor = getSupervisor(1, 1, true, "PT1H", null); supervisor.start(); supervisor.stop(false); @@ -968,7 +1092,13 @@ private void addSomeEvents(int numEventsPerPartition) throws Exception } } - private KafkaSupervisor getSupervisor(int replicas, int taskCount, boolean useEarliestOffset, String duration) + private KafkaSupervisor getSupervisor( + int replicas, + int taskCount, + boolean useEarliestOffset, + String duration, + Period lateMessageRejectionPeriod + ) { KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( KAFKA_TOPIC, @@ -979,7 +1109,8 @@ private KafkaSupervisor getSupervisor(int replicas, int taskCount, boolean useEa new Period("P1D"), new Period("PT30S"), useEarliestOffset, - new Period("PT30M") + new Period("PT30M"), + lateMessageRejectionPeriod ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(null, null) @@ -1049,7 +1180,8 @@ private KafkaIndexTask createKafkaIndexTask( String dataSource, String sequenceName, KafkaPartitions startPartitions, - KafkaPartitions endPartitions + KafkaPartitions endPartitions, + DateTime minimumMessageTime ) { return new KafkaIndexTask( @@ -1064,7 +1196,7 @@ private KafkaIndexTask createKafkaIndexTask( ImmutableMap.of(), true, false, - null + minimumMessageTime ), ImmutableMap.of(), null