Run compaction as a supervisor on Overlord#16768
Conversation
| return 0; | ||
| } | ||
|
|
||
| public enum State implements SupervisorStateManager.State |
Check notice
Code scanning / CodeQL
Class has same name as super class
abhishekagarwal87
left a comment
There was a problem hiding this comment.
how much memory usage is going to increase on Overlord after it starts doing the segment polling?
| * search policy to skip an interval if it has been recently compacted or if it | ||
| * keeps failing repeatedly. | ||
| */ | ||
| public class CompactionStatusTracker |
There was a problem hiding this comment.
there should be a way to forget the state. That is don't compact an interval if its been failing but then try again after X minutes.
There was a problem hiding this comment.
That logic would reside in the policy itself.
I will write an experimental policy to demonstrate this usage.
There was a problem hiding this comment.
How does this interface allow for policies that are at interval level e.g. compacting intervals with the least compacted data first?
There was a problem hiding this comment.
All policies are at interval level since we can either choose to compact interval or not compact it at all.
(The current state of compaction in an interval can be partial though, if segments were appended to that interval after a prior round of compaction.)
The object SegmentsToCompact represents all segments to compact in a single interval.
Do you think some rename of the base policy would help?
| private Response buildErrorResponseWhenRunningAsSupervisor() | ||
| { | ||
| return Response.status(Response.Status.SERVICE_UNAVAILABLE).entity( | ||
| ImmutableMap.of( | ||
| "error", | ||
| "Compaction has been disabled on the Coordinator." | ||
| + " Use Overlord APIs to fetch compaction status." | ||
| ) | ||
| ).build(); | ||
| } |
There was a problem hiding this comment.
This will be a problem during rolling config changes when you are going from overlord based compaction to coordinator based compaction. Router will start issuing requests to coordinator but coordinator will reject till it has been restarted with the property. maybe we can return an error code (302) and router can send the request to overlord?
There was a problem hiding this comment.
Or we don't return an error but log a warning?
There was a problem hiding this comment.
If we just log a warning, we might return stale information from the coordinator, without the client ever realizing it.
Returning 302 would have sufficed if the path was the same for the two resources, but one path has prefix /druid/coordinator/v1/compaction and the other has /druid/indexer/v1/compaction.
The Druid clients DruidLeaderClient and ServiceClientImpl handle redirects but only update the service location and not the path.
Another option could be to just not do anything at the router, and simply use OverlordClient on the coordinator side and forward requests to the overlord if compaction supervisors are enabled.
Let me know what you think.
There was a problem hiding this comment.
@abhishekagarwal87 , to deal with the issue with rolling upgrades, I decide to do this instead.
- The leader Overlord is the single source of truth on whether the feature is enabled.
- API
/druid/indexer/v1/compaction/isSupervisorEnabledcan be used by the Coordinator to determine if the feature is enabled. - If enabled, Coordinator redirects the compaction status APIs to the leader Overlord using the
OverlordClient. - Router does not perform any redirecting between Overlord and Coordinator.
- Overlord does not redirect to Coordinator. When the feature is disabled, Overlord simply returns a 504 (service unavailable) with an appropriate error message. This is to keep things simple and to avoid getting into an infinite redirect situation.
The only reason we need the redirect is that a client (like web-console) using the Coordinator compaction status APIs continues to work seamlessly when we toggle the feature on or off. With a client explicitly using the new Overlord APIs, we don't have the burden of backward compatibility and thus no redirect is required.
| @JacksonInject CompactionScheduler scheduler | ||
| ) | ||
| { | ||
| final CompactionConfigValidationResult validationResult = scheduler.validateCompactionConfig(spec); |
There was a problem hiding this comment.
@gargvishesh , as discussed, I have added this validation.
Unfortunately, with the present structure, I cannot add a validation in CoordinatorCompactionConfigsResource.updateClusterCompactionConfig() since that class is in druid-server module and the CompactionSupervisorSpec class resides in druid-indexing-service.
If needed, we can try to do it in a follow up PR by moving CoordinatorCompactionConfigsResource to druid-indexing-service module.
| * compacting intervals or segments that do not fulfil some required criteria. | ||
| */ | ||
| boolean isEligibleForCompaction( | ||
| CompactionCandidate candidate, |
Check notice
Code scanning / CodeQL
Useless parameter
| */ | ||
| boolean isEligibleForCompaction( | ||
| CompactionCandidate candidate, | ||
| CompactionStatus currentCompactionStatus, |
Check notice
Code scanning / CodeQL
Useless parameter
| boolean isEligibleForCompaction( | ||
| CompactionCandidate candidate, | ||
| CompactionStatus currentCompactionStatus, | ||
| CompactionTaskStatus latestTaskStatus |
Check notice
Code scanning / CodeQL
Useless parameter
Description ----------- Auto-compaction currently poses several challenges as it: 1. may get stuck on a failing interval. 2. may get stuck on the latest interval if more data keeps coming into it. 3. always picks the latest interval regardless of the level of compaction in it. 4. may never pick a datasource if its intervals are not very recent. 5. requires setting an explicit period which does not cater to the changing needs of a Druid cluster. This PR introduces various improvements to compaction scheduling to tackle the above problems. Change Summary -------------- 1. Run compaction for a datasource as a supervisor of type `autocompact` on Overlord. 2. Make compaction policy extensible and configurable. 3. Track status of recently submitted compaction tasks and pass this info to policy. 4. Add `/simulate` API on both Coordinator and Overlord to run compaction simulations. 5. Redirect compaction status APIs to the Overlord when compaction supervisors are enabled.
Description: #16768 introduces new compaction APIs on the Overlord `/compact/status` and `/compact/progress`. But the corresponding `OverlordClient` methods do not return an object compatible with the actual endpoints defined in `OverlordCompactionResource`. This patch ensures that the objects are compatible. Changes: - Add `CompactionStatusResponse` and `CompactionProgressResponse` - Use these as the return type in `OverlordClient` methods and as the response entity in `OverlordCompactionResource` - Add `SupervisorCleanupModule` bound on the Coordinator to perform cleanup of supervisors. Without this module, Coordinator cannot deserialize compaction supervisors.
#16768 added the functionality to run compaction as a supervisor on the overlord. This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only. With these changes, users can no longer add MSQ engine as part of datasource compaction config, or as the default cluster-level compaction engine, on the Coordinator. The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=<msq/native>` to specify the default engine for compaction supervisors. Since these updates require major changes to existing MSQ compaction integration tests, this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR. Key changed/added classes in this patch: * CompactionSupervisor * CompactionSupervisorSpec * CoordinatorCompactionConfigsResource * OverlordCompactionScheduler
apache#16768 added the functionality to run compaction as a supervisor on the overlord. This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only. With these changes, users can no longer add MSQ engine as part of datasource compaction config, or as the default cluster-level compaction engine, on the Coordinator. The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=<msq/native>` to specify the default engine for compaction supervisors. Since these updates require major changes to existing MSQ compaction integration tests, this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR. Key changed/added classes in this patch: * CompactionSupervisor * CompactionSupervisorSpec * CoordinatorCompactionConfigsResource * OverlordCompactionScheduler
…17143) #16768 added the functionality to run compaction as a supervisor on the overlord. This patch builds on top of that to restrict MSQ engine to compaction in the supervisor-mode only. With these changes, users can no longer add MSQ engine as part of datasource compaction config, or as the default cluster-level compaction engine, on the Coordinator. The patch also adds an Overlord runtime property `druid.supervisor.compaction.engine=<msq/native>` to specify the default engine for compaction supervisors. Since these updates require major changes to existing MSQ compaction integration tests, this patch disables MSQ-specific compaction integration tests -- they will be taken up in a follow-up PR. Key changed/added classes in this patch: * CompactionSupervisor * CompactionSupervisorSpec * CoordinatorCompactionConfigsResource * OverlordCompactionScheduler Co-authored-by: Vishesh Garg <gargvishesh@gmail.com>
Description
Auto-compaction currently poses several challenges as it:
This PR introduces various improvements to compaction scheduling to tackle the above problems.
Change Summary
autocompacton Overlord./simulateAPI on both Coordinator and Overlord to run compaction simulations.Usage
1. Enable compaction to run as supervisors
druid.supervisor.compaction.enabled=trueruntime.propertiesfor Overlordfalseby default.truefalse(default)NOTE: This does not migrate existing compaction configs to the new supervisor flow.
2. Submit a compaction supervisor
POST
/druid/indexer/v1/supervisorSample payload:
{ "type": "autocompact", // required "suspended": false, // optional "spec": { // required "dataSource": "wikipedia", // required "tuningConfig": {...}, // optional "granularitySpec": {...}, // optional ... other fields supported in compaction config ... } }3. Suspend a compaction supervisor
POST
/druid/indexer/v1/supervisor/{id}/suspend4. Resume a compaction supervisor
POST
/druid/indexer/v1/supervisor/{id}/resume5. Update cluster-level compaction configs
POST
/druid/coordinator/v1/config/compaction/clusterThis API was added in #16803
Payload structure:
6. Update policy in the compaction config
POST
/druid/coordinator/v1/config/compaction/clusterThis API was added in #16803
Sample payloads:
[A] Update policy to
smallestSegmentFirstandpriorityDatasourcetowikipedia{ "compactionPolicy": { "type": "newestSegmentFirst", "priorityDatasource": "wikipedia" } }7. Simulate compaction run on Overlord (when
druid.supervisor.compaction.enabled=true)POST
/druid/indexer/v1/compaction/simulatePayload structure: same as `/druid/coordinator/v1/config/compaction/cluster` Sample payloads: (see next section)
8. Simulate compaction run on Coordinator (when
druid.supervisor.compaction.enabled=false)POST
/druid/coordinator/v1/compaction/simulateSample payloads:
[A] Simulate with no config update
{ }[B] Simulate with new policy
{ "compactionPolicy": { "type": "smallestSegmentFirst" } }[C] Simulate with new policy and priority datasource
{ "compactionPolicy": { "type": "smallestSegmentFirst", "priorityDatasource": "wikipedia" } }Implementation
1. Run compaction for a datasource as a supervisor of type
autocompacton Overlord.Why run compaction as a supervisor?
Auto-compaction performs this same duty for tasks of type
compact.TaskRunnerand be more reactive in nature.Add
OverlordCompactionSchedulerto manage compaction on the leader OverlordThis may lead to some increase in memory footprint of Overlord.
CompactSegmentsduty every 5 seconds to identify compactible intervals across all datasources.CompactSegmentsduty ensures that the logic remains unchanged from traditional auto-compaction.LocalOverlordClientwhich redirects all queries toTaskQueryToolorTaskQueue. In traditional auto-compaction, these queries are HTTP API calls from Coordinator to the Overlord.2. Make compaction policy extensible and configurable.
Policy can now be easily extended and specified as a cluster-level dynamic config.
The base policy is
PriorityBasedSegmentSearchPolicy.A concrete policy can choose to implement the following methods:
getSegmentComparator()to prioritize between segments and intervalsshouldSkipCompaction()to completely skip a set of segments/intervals if saypriorityDatasource. Other datasources are picked for compaction only after all the eligible intervals of the priority datasource have been compacted.Policy
newestSegmentFirstnewestSegmentFirstpolicy.priorityDatasource.Policy
smallestSegmentFirstpriorityDatasource3. Track status of recently submitted compaction tasks and pass this info to policy
CompactionStatusTrackerthat tracks status of recently submitted and completed compaction tasks.shouldSkipCompaction()method of the segment search policy.4. Add
/simulateAPI to run compaction simulationsCompactSegmentsduty assuming unlimited task slots5. Redirect compaction status APIs to the Overlord when compaction supervisor is enabled
Overlord exposes a new API
/druid/indexer/v1/compaction/isSupervisorEnabledwhich can be used bythe leader Coordinator to determine if compaction is enabled to run as a supervisor.
If this API returns true, the Coordinator does not run the auto-compaction duty at all and redirects the
compaction progress APIs
/compaction/statusand/compaction/progressto Overlord instead.Main classes to review
CompactionSupervisorCompactionSchedulerOverlordCompactionSchedulerCompactionStatusTrackerPriorityBasedSegmentSearchPolicyCompactionRunSimulatorTesting
Pending items
LocalOverlordClientdoes not hit the metadata store too oftenRelease note
This PR has: