Automatic pendingSegments cleanup#5149
Conversation
jon-wei
left a comment
There was a problem hiding this comment.
Did an initial review, will revisit this later today
| } | ||
| } | ||
|
|
||
| private List<TaskStatus> getRecentlyFinishedTaskSTatusesSince(long start, Ordering<TaskStuff> createdDateDesc) |
There was a problem hiding this comment.
There's an extra capitalized T:
getRecentlyFinishedTaskSTatusesSince > getRecentlyFinishedTaskStatusesSince
| for (ImmutableDruidDataSource dataSource : params.getDataSources()) { | ||
| if (!params.getCoordinatorDynamicConfig().getKillPendingSegmentsSkipList().contains(dataSource.getName())) { | ||
| log.info( | ||
| "Kill pendingSegments created until [%s] for dataSource[%s]", |
There was a problem hiding this comment.
The string format arguments are in reverse order here
| DruidCoordinatorSegmentKiller.class | ||
| ).addConditionBinding( | ||
| "druid.coordinator.kill.pendingSegments.on", | ||
| predicate -> Objects.equals(predicate, "true"), |
There was a problem hiding this comment.
Maybe use Predicates.equalTo("true") to be consistent with the other bindings here
| } | ||
|
|
||
| @JsonProperty | ||
| public Set<String> getKillPendingSegmentsSkipList() |
There was a problem hiding this comment.
Can you add a comment that explains this is used to prevent automatic pending segment deletion for some datasources?
I was a bit confused initially and thought the skip list was referring to this: https://en.wikipedia.org/wiki/Skip_list
|
|
||
| Preconditions.checkArgument( | ||
| !deleteInterval.overlaps(activeTaskInterval), | ||
| "Cannot delete pendingSegments because there is at least one running task created at %s", |
There was a problem hiding this comment.
hm, I wonder if throwing an exception here is too aggressive, if I understand correctly:
- Since tasks can take a varying amount of time to complete, it seems like it wouldn't be too uncommon to have a situation where there are active tasks, created before the most recently completed task, that are still running
- Would it be better to just return 0 here without an exception, or maybe use
minCreatedDateOfActiveTasksas the end of the pending segment deletion interval?
There was a problem hiding this comment.
Good point. I changed to gather all incomplete tasks and the last complete task and find the earliest createdTime. This makes less exceptions occur when the coordinator kills pending segments.
I think throwing an exception here is fine now because it rarely occurs 1) if there is a bug in DruidCoordinatorCleanupPendingSegments and 2) humans call the overlord API directly with a wrong interval.
| Interval tryInterval, | ||
| Interval rowInterval, | ||
| boolean logOnFail, | ||
| boolean skipSegmentLineageCheck |
There was a problem hiding this comment.
skipSegmentLineageCheck can be removed from IndexerMetadataStorageConnector's allocatePendingSegment too.
There was a problem hiding this comment.
Would you elaborate on this? I guess you mean IndexerMetadataStorageCoordinator, but skipSegmentLineageCheck is used in IndexerSQLMetadataStorageCoordinator.allocatePendingSegment().
| } | ||
| } | ||
|
|
||
| private List<TaskStatus> getRecentlyFinishedTaskSTatusesSince(long start, Ordering<TaskStuff> createdDateDesc) |
There was a problem hiding this comment.
Capitalization is a bit weird on TaskSTatuses
| } | ||
| } | ||
| ); | ||
| final List<TaskStatusPlus> completeTasks = recentlyFinishedTasks |
There was a problem hiding this comment.
Please retain the comment // Would be nice to include the real created date, but the TaskStorage API doesn't yet allow it. (or even make it possible!)
There was a problem hiding this comment.
Added the comment. I'm thinking to refactor some classes related to TaskStatus. The APIs to get createdDate and queueInsertionTime will be added when it's done!
| @Produces(MediaType.APPLICATION_JSON) | ||
| public Response killPendingSegments( | ||
| @PathParam("dataSource") String dataSource, | ||
| @QueryParam("interval") String deleteIntervalString, |
There was a problem hiding this comment.
Would it work to make this an Interval rather than String? We do have a jackson deserializer set up from json strings to Druid Intervals.
There was a problem hiding this comment.
The javadoc of QueryParam is saying
The type T of the annotated parameter, field or property must either:
- Be a primitive type
- Have a constructor that accepts a single String argument
- Have a static method named valueOf or fromString that accepts a single String argument (see, for example, Integer.valueOf(String))
- Be List, Set or SortedSet, where T satisfies 2 or 3 above. The resulting collection is read-only.
I guess the deleteInterval should be passed in the HTTP message body rather than QueryParam or PathParam (PathParm also has similar requirements with QueryParam) to make this type-safe. If so, the as-is looks better to me because it's simple.
| .map(task -> taskStorageQueryAdapter.getCreatedTime(task.getId())) | ||
| .min(Comparator.naturalOrder()); | ||
|
|
||
| final Interval activeTaskInterval = new Interval( |
There was a problem hiding this comment.
I think we should include some kind of buffer zone here to account for the fact that clocks are not guaranteed to be in sync. I'm not sure how much grace period is needed but probably somewhere between 10 minutes and 24 hours. What do you think?
There was a problem hiding this comment.
I guess you mean we need a buffer when the coordinator kills pending segments. Or do you mean the buffer is needed when IndexerMetadataStorageAdapter checks the given interval overlaps the createdDate of any running tasks?
The former one sounds good. I added a buffer of 24 hours and now the endDate of the interval for killing pendingSegments is decided by min(createdDates of running/pending/waiting/complete tasks, DateTimes.nowUtc()).
| ); | ||
|
|
||
| if (!authResult.isAllowed()) { | ||
| throw new ForbiddenException(authResult.toString()); |
There was a problem hiding this comment.
authResult.getMessage() would make more sense I think. Also, now that I look at AuthResult, it seems that setMessage is never called and so it could be removed and message made final. It's unrelated to this patch but still a nice cleanup.
@gianm Good point. I agree. |
|
LGTM, can you fix the CI errors? |
|
note that this configuration is not documented in the 0.12.0 docs |
|
Thank you for the report! Raised #5563. |
| } | ||
| createdTimes.sort(Comparators.naturalNullsFirst()); | ||
|
|
||
| // There should be at least one createdTime because the current time is added to the 'createdTimes' list if there |
There was a problem hiding this comment.
Is this comment accurate? I don't see where it's implemented, which suggests that this helper can crash in a brand new cluster.
With this patch, the coordinator periodically deletes pendingSegments from the
pendingSegmentstable. You can test by addingdruid.coordinator.kill.pendingSegments.on=trueto your coordinator configuration file.Additionally, I added
TaskStatusPluswhich contains taskId, createdTime, queueInsertionTime, taskState, runningDuration, and taskLocation. We currently have 4 similar classes, i.e.,TaskStatus,TaskRunnerWorkItem,TaskResponseObjectinOverlordResource, andTaskResponseObjectfor integration tests, but they are designed to be used in some specific classes. I would love to add a new one to represent all these classes and remove others, but it will cause a lot of code changes which makes difficult to be done together in this PR. So, in this PR,druid-api. This is to use the same class when coordinators call overlord APIs.Statusenum inTaskStatusis extracted as a separateTaskStateenumeration and moved todruid-api.TaskLocationis moved todruid-api.This change is