Consolidate RetrieveSegmentsToReplaceAction into RetrieveUsedSegmentsAction#15699
Conversation
kfaraz
left a comment
There was a problem hiding this comment.
We should also remove the other task action RetrieveSegmentsToReplaceAction as that is not needed anymore.
A rolling upgrade would upgrade MMs first, so we would not have an issue of firing a task action that the OL doesn't recognize.
| // Defaulting to the former behaviour when visibility wasn't explicitly specified for backward compatibility | ||
| this.visibility = visibility != null ? visibility : Segments.ONLY_VISIBLE; | ||
|
|
||
| this.replace = replace != null ? replace : false; |
There was a problem hiding this comment.
Can this not be determined automatically while running the action? Only tasks with a REPLACE lock would have this as true, right?
There was a problem hiding this comment.
Thanks, I think this can always be true.
If we need to fetch all the unfiltered segments for a task holding replace locks, we could use a task action client created using a different task that holds no locks.
| @Override | ||
| public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox) | ||
| { | ||
| if (!replace) { |
There was a problem hiding this comment.
Cleaner to do:
if (isReplaceTask()) {
return newMethodWhichDoesTheRightThingForReplaceTask();
} else {
return retrieveUsedSegments(toolbox);
}
private boolean isReplaceTask() {
return replace && task.getDatasource().equals(dataSource);
}
There was a problem hiding this comment.
The new method should also have a javadoc saying that it returns a consistent view of segments and why it is needed.
There was a problem hiding this comment.
I think the current structure makes it more readable as it is obvious that we are using the old logic whenever possible before trying to filter the segments using the locks.
if (datasourceToReplace != datasourceToRead) {
return retrieveUsedSegments();
}
Set<ReplaceLock> replaceLocks = fetchReplaceLocksForTask();
if (replaceLocks.isEmpty()) {
return retrieveUsedSegments()
}
return retrieveUsedSegmentsCreatedBeforeReplaceVersions(replaceLocks);
| final String supervisorId; | ||
| if (task instanceof AbstractBatchSubtask) { | ||
| supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId(); | ||
| } else { | ||
| supervisorId = task.getId(); | ||
| } |
There was a problem hiding this comment.
For later, can we confirm if this logic is really needed? I think the task action is always fired using the supervisor task ID.
| allSegmentsToBeReplaced.addAll(createdAndSegments.getValue()); | ||
| } else { | ||
| for (DataSegment segment : createdAndSegments.getValue()) { | ||
| log.info("Ignoring segment[%s] as it has created_date[%s] greater than the REPLACE lock version[%s]", |
There was a problem hiding this comment.
Instead of logging all segment IDs separately, add them to a set and just log once.
There was a problem hiding this comment.
The intent was to keep it verbose with the segment id and created date with the replace version available in each log.
There was a problem hiding this comment.
I fear it might end up being too verbose. Better to just log the replace lock version for each interval and point out that anything newer than that would not be considered.
The segment IDs if needed can go in a debug log.
kfaraz
left a comment
There was a problem hiding this comment.
Changes look okay, left minor comments and MSQ tests need to be handled.
| dataSource, | ||
| intervals | ||
| )); | ||
| // Additional check as the task action does not accept empty intervals |
There was a problem hiding this comment.
Don't think this comment is really needed or accurate. The real reason we are not firing a task action if the intervals is empty because we know we would get back an empty result. Why perform an unnecessary round trip?
| publishedUsedSegments = context.taskActionClient().submit(new RetrieveUsedSegmentsAction( | ||
| dataSource, | ||
| intervals | ||
| )); |
There was a problem hiding this comment.
style:
| publishedUsedSegments = context.taskActionClient().submit(new RetrieveUsedSegmentsAction( | |
| dataSource, | |
| intervals | |
| )); | |
| publishedUsedSegments = context.taskActionClient().submit( | |
| new RetrieveUsedSegmentsAction(dataSource, intervals) | |
| ); |
| return retrieveUsedSegments(toolbox); | ||
| } | ||
|
|
||
| Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = new HashMap<>(); |
There was a problem hiding this comment.
I recall putting a comment here but can't find it anywhere.
I would prefer it if we moved the code here on down into a new method
retrieveUsedSegmentsForReplace(toolbox, replaceLocks).
…Action (apache#15699) Consolidate RetrieveSegmentsToReplaceAction into RetrieveUsedSegmentsAction
…Action (apache#15699) Consolidate RetrieveSegmentsToReplaceAction into RetrieveUsedSegmentsAction
This PR aims to let the original RetrieveUsedSegmentsAction work with REPLACE locks by introducing a boolean flag.
When the task does not hold any REPLACE locks, the behaviour is identical to the existing behaviour.
However, when there are REPLACE locks, only segments that were created before the times corresponding to the versions of the locks will be fetched for their respective intervals.
Using the original action helps with rolling upgrades where the flag is simply ignored when the overlord has not been upgraded.
It also helps eliminate the need for the undocumented parameter introduced in #15430