Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ For Roaring bitmaps:
|`useEarliestOffset`|Boolean|If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.|no (default == false)|
|`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T13:00Z* will be dropped.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped.|no (default == none)|
|`skipOffsetGaps`|Boolean|Whether or not to allow gaps of missing offsets in the Kafka stream. This is required for compatibility with implementations such as MapR Streams which does not guarantee consecutive offsets. If this is false, an exception will be thrown if offsets are not consecutive.|no (default == false)|

## Supervisor API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,21 @@ public void run()

fireDepartmentMetrics.incrementProcessed();
} else {
if (log.isDebugEnabled()) {
if (beforeMinimumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is before MinimumMessageTime[%s]",
row.getTimestamp(),
ioConfig.getMinimumMessageTime().get()
);
} else if (afterMaximumMessageTime) {
log.debug(
"CurrentTimeStamp[%s] is after MaximumMessageTime[%s]",
row.getTimestamp(),
ioConfig.getMaximumMessageTime().get()
);
}
}
fireDepartmentMetrics.incrementThrownAway();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,7 @@ void createNewTasks()
) : Optional.<DateTime>absent());

Optional<DateTime> maximumMessageTime = (ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of(
DateTimes.nowUtc().plus(ioConfig.getEarlyMessageRejectionPeriod().get())
DateTimes.nowUtc().plus(ioConfig.getTaskDuration()).plus(ioConfig.getEarlyMessageRejectionPeriod().get())
) : Optional.<DateTime>absent());

taskGroups.put(groupId, new TaskGroup(generateStartingOffsetsForPartitionGroup(groupId), minimumMessageTime, maximumMessageTime));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,11 +436,11 @@ public void testEarlyMessageRejectionPeriod() throws Exception

Assert.assertTrue(
"maximumMessageTime",
task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(59).isAfterNow()
task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(59 + 60).isAfterNow()
);
Assert.assertTrue(
"maximumMessageTime",
task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(61).isBeforeNow()
task1.getIOConfig().getMaximumMessageTime().get().minusMinutes(61 + 60).isBeforeNow()
);
Assert.assertEquals(
task1.getIOConfig().getMaximumMessageTime().get(),
Expand Down Expand Up @@ -861,7 +861,10 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception
// check that failed tasks are recreated with the same minimumMessageTime as the task it replaced, even if that
// task came from another supervisor
Assert.assertEquals(now, ((KafkaIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMinimumMessageTime().get());
Assert.assertEquals(maxi, ((KafkaIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime().get());
Assert.assertEquals(
maxi,
((KafkaIndexTask) aNewTaskCapture.getValue()).getIOConfig().getMaximumMessageTime().get()
);
}

@Test
Expand Down