-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Kafka task minimum message time #3035
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,8 +37,8 @@ | |
| import com.metamx.common.ISE; | ||
| import com.metamx.emitter.EmittingLogger; | ||
| import io.druid.concurrent.Execs; | ||
| import io.druid.indexing.common.TaskLocation; | ||
| import io.druid.indexing.common.TaskInfoProvider; | ||
| import io.druid.indexing.common.TaskLocation; | ||
| import io.druid.indexing.common.TaskStatus; | ||
| import io.druid.indexing.common.task.Task; | ||
| import io.druid.indexing.common.task.TaskResource; | ||
|
|
@@ -113,11 +113,13 @@ private class TaskGroup | |
| final Map<Integer, Long> partitionOffsets; | ||
|
|
||
| final Map<String, TaskData> tasks = new HashMap<>(); | ||
| final Optional<DateTime> minimumMessageTime; | ||
| DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action | ||
|
|
||
| public TaskGroup(Map<Integer, Long> partitionOffsets) | ||
| public TaskGroup(Map<Integer, Long> partitionOffsets, Optional<DateTime> minimumMessageTime) | ||
| { | ||
| this.partitionOffsets = partitionOffsets; | ||
| this.minimumMessageTime = minimumMessageTime; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -450,6 +452,9 @@ String generateSequenceName(int groupId) | |
| } | ||
| String partitionOffsetStr = sb.toString().substring(1); | ||
|
|
||
| Optional<DateTime> minimumMessageTime = taskGroups.get(groupId).minimumMessageTime; | ||
| String minMsgTimeStr = (minimumMessageTime.isPresent() ? String.valueOf(minimumMessageTime.get().getMillis()) : ""); | ||
|
|
||
| String dataSchema, tuningConfig; | ||
| try { | ||
| dataSchema = sortingMapper.writeValueAsString(spec.getDataSchema()); | ||
|
|
@@ -459,7 +464,8 @@ String generateSequenceName(int groupId) | |
| throw Throwables.propagate(e); | ||
| } | ||
|
|
||
| String hashCode = DigestUtils.sha1Hex(dataSchema + tuningConfig + partitionOffsetStr).substring(0, 15); | ||
| String hashCode = DigestUtils.sha1Hex(dataSchema + tuningConfig + partitionOffsetStr + minMsgTimeStr) | ||
| .substring(0, 15); | ||
|
|
||
| return Joiner.on("_").join("index_kafka", dataSource, hashCode); | ||
| } | ||
|
|
@@ -599,11 +605,19 @@ private void discoverTasks() | |
| log.debug("Creating new task group [%d]", taskGroupId); | ||
| taskGroups.put( | ||
| taskGroupId, | ||
| new TaskGroup(kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap()) | ||
| new TaskGroup( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I see here is where discovery of non-publishing tasks is going to happen. |
||
| kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap(), | ||
| kafkaTask.getIOConfig().getMinimumMessageTime() | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| taskGroups.get(taskGroupId).tasks.put(taskId, new TaskData()); | ||
| if (!isTaskCurrent(taskGroupId, taskId)) { | ||
| log.info("Stopping task [%s] which does not match the expected parameters and ingestion spec", taskId); | ||
| stopTask(taskId, false); | ||
| } else { | ||
| taskGroups.get(taskGroupId).tasks.put(taskId, new TaskData()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -635,7 +649,11 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups( | |
| } | ||
|
|
||
| log.info("Creating new pending completion task group for discovered task [%s]", taskId); | ||
| TaskGroup newTaskGroup = new TaskGroup(startingPartitions); | ||
|
|
||
| // reading the minimumMessageTime from the publishing task and setting it here is not necessary as this task cannot | ||
| // change to a state where it will read any more events | ||
| TaskGroup newTaskGroup = new TaskGroup(startingPartitions, Optional.<DateTime>absent()); | ||
|
|
||
| newTaskGroup.tasks.put(taskId, new TaskData()); | ||
| newTaskGroup.completionTimeout = DateTime.now().plus(ioConfig.getCompletionTimeout()); | ||
|
|
||
|
|
@@ -889,7 +907,8 @@ private void checkCurrentTaskState() | |
| TaskGroup taskGroup = taskGroupEntry.getValue(); | ||
|
|
||
| // Iterate the list of known tasks in this group and: | ||
| // 1) Kill any tasks which are not "current" (have the partitions and starting offsets in [taskGroups] | ||
| // 1) Kill any tasks which are not "current" (have the partitions, starting offsets, and minimumMessageTime | ||
| // (if applicable) in [taskGroups]) | ||
| // 2) Remove any tasks that have failed from the list | ||
| // 3) If any task completed successfully, stop all the tasks in this group and move to the next group | ||
|
|
||
|
|
@@ -933,7 +952,12 @@ void createNewTasks() | |
| for (Integer groupId : partitionGroups.keySet()) { | ||
| if (!taskGroups.containsKey(groupId)) { | ||
| log.info("Creating new task group [%d] for partitions %s", groupId, partitionGroups.get(groupId).keySet()); | ||
| taskGroups.put(groupId, new TaskGroup(generateStartingOffsetsForPartitionGroup(groupId))); | ||
|
|
||
| Optional<DateTime> minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of( | ||
| DateTime.now().minus(ioConfig.getLateMessageRejectionPeriod().get()) | ||
| ) : Optional.<DateTime>absent()); | ||
|
|
||
| taskGroups.put(groupId, new TaskGroup(generateStartingOffsetsForPartitionGroup(groupId), minimumMessageTime)); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -971,14 +995,16 @@ private void createKafkaTasksForGroup(int groupId, int replicas) | |
| String sequenceName = generateSequenceName(groupId); | ||
|
|
||
| Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties()); | ||
| DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull(); | ||
|
|
||
| KafkaIOConfig kafkaIOConfig = new KafkaIOConfig( | ||
| sequenceName, | ||
| new KafkaPartitions(ioConfig.getTopic(), startPartitions), | ||
| new KafkaPartitions(ioConfig.getTopic(), endPartitions), | ||
| consumerProperties, | ||
| true, | ||
| false | ||
| false, | ||
| minimumMessageTime | ||
| ); | ||
|
|
||
| for (int i = 0; i < replicas; i++) { | ||
|
|
@@ -1098,7 +1124,8 @@ private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOf | |
|
|
||
| /** | ||
| * Compares the sequence name from the task with one generated for the task's group ID and returns false if they do | ||
| * not match. The sequence name is generated from a hash of the dataSchema, tuningConfig, and starting offsets. | ||
| * not match. The sequence name is generated from a hash of the dataSchema, tuningConfig, starting offsets, and the | ||
| * minimumMessageTime if set. | ||
| */ | ||
| private boolean isTaskCurrent(int taskGroupId, String taskId) | ||
| { | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC the comment on isTaskCurrent says what goes into this, and so should be updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch