From 21794352f9f906cb665f9def27602f809f4e3f94 Mon Sep 17 00:00:00 2001 From: liuboyu Date: Fri, 20 Oct 2017 16:08:01 +0800 Subject: [PATCH] define earlyMessegeRejectPeriod as the period after the taskduration --- .../extensions-core/kafka-ingestion.md | 2 +- .../io/druid/indexing/kafka/KafkaIndexTask.java | 15 +++++++++++++++ .../kafka/supervisor/KafkaSupervisor.java | 2 +- .../kafka/supervisor/KafkaSupervisorTest.java | 9 ++++++--- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 7212d18b55cb..5a8d661d648a 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -168,7 +168,7 @@ For Roaring bitmaps: |`useEarliestOffset`|Boolean|If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.|no (default == false)| |`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)| |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)| -|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T13:00Z* will be dropped.|no (default == none)| +|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped.|no (default == none)| |`skipOffsetGaps`|Boolean|Whether or not to allow gaps of missing offsets in the Kafka stream. This is required for compatibility with implementations such as MapR Streams which does not guarantee consecutive offsets. If this is false, an exception will be thrown if offsets are not consecutive.|no (default == false)| ## Supervisor API diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 145ba31e5ed7..17a379a6f473 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -499,6 +499,21 @@ public void run() fireDepartmentMetrics.incrementProcessed(); } else { + if (log.isDebugEnabled()) { + if (beforeMinimumMessageTime) { + log.debug( + "CurrentTimeStamp[%s] is before MinimumMessageTime[%s]", + row.getTimestamp(), + ioConfig.getMinimumMessageTime().get() + ); + } else if (afterMaximumMessageTime) { + log.debug( + "CurrentTimeStamp[%s] is after MaximumMessageTime[%s]", + row.getTimestamp(), + ioConfig.getMaximumMessageTime().get() + ); + } + } fireDepartmentMetrics.incrementThrownAway(); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 29804f2f3662..302077fc15a1 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1370,7 +1370,7 @@ void createNewTasks() ) : Optional.absent()); Optional maximumMessageTime = (ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of( - DateTimes.nowUtc().plus(ioConfig.getEarlyMessageRejectionPeriod().get()) + DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get()) ) : Optional.absent()); taskGroups.put(groupId, new TaskGroup(generateStartingOffsetsForPartitionGroup(groupId), minimumMessageTime, maximumMessageTime)); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index b01fd38268d7..96244ca559e1 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -436,11 +436,11 @@ public void testEarlyMessageRejectionPeriod() throws Exception Assert.assertTrue( "maximumMessageTime", - task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(59).isAfterNow() + task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(59 + 60).isAfterNow() ); Assert.assertTrue( "maximumMessageTime", - task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(61).isBeforeNow() + task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(61 + 60).isBeforeNow() ); Assert.assertEquals( task1.getIOConfig().getMaximumMessageTime().get(), @@ -861,7 +861,10 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception // check that failed tasks are recreated with the same minimumMessageTime as the task it replaced, even if that // task came from another supervisor Assert.assertEquals(now, ((KafkaIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime().get()); - Assert.assertEquals(maxi, ((KafkaIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime().get()); + Assert.assertEquals( + maxi, + ((KafkaIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime().get() + ); } @Test