Conversation
…5/druid into reset-offsets-and-backfill
There was a problem hiding this comment.
Thanks @aho135 for the feature! I’ve left some high-level comments. I’m slightly wary of having the supervisor manage unsupervised backfill tasks in the background (similar to the limitations like task failures/resiliency/reporting you already called out in the PR summary).
As an alternative design, I wonder if it would be cleaner to leverage multi-supervisor support and spin up a new backfill supervisor via an API. For example, the high-level API could compute the offset ranges (similar to what’s done in this PR), reset the running supervisor, and then submit a new backfill supervisor based on the active supervisor spec with a constructed transformSpec (similar to #18757
for reading topic partition-offset ranges). Another option could be introducing a separate ioConfig for a backfill supervisor, although that might be more involved and potentially overkill.
This would keep everything “supervised” and naturally address some of the limitations in the current design around task supervision. It would also provide a clear separation between the running and backfill supervisors from an operational standpoint.
What are your thoughts?
| // Get the backfillTaskCount from config | ||
| int backfillTaskCount = spec.getSpec().getIOConfig().getBackfillTaskCount(); |
There was a problem hiding this comment.
Is there a reason to include this in the supervisor's ioConfig? It feels like a property that should be supplied by the operator when invoking the API, rather than being defined at the supervisor's ioConfig. It could instead default to taskCount / 2 (or a similar value) as part of the API invocation.
| public Response reset(@PathParam("id") final String id) | ||
| public Response reset( | ||
| @PathParam("id") final String id, | ||
| @QueryParam("backfill") Boolean backfill |
There was a problem hiding this comment.
IMO it would be cleaner to introduce a new endpoint /{id}/resetAndBackfill especially if we’re adding more backfill-specific properties
Thanks for the review @abhishekrb19!
I was debating the same, but felt that this doesn't really fit into the existing StreamSupervisor implementation for these reasons:
I found the |
These are good points. I wonder if there’s any reason not to expose start and end sequence numbers in the supervisor config. Doing so would let the supervisor transition to a terminal state once those values are explicitly set and the specified offset range has been successfully read and published. Also tagging @gianm @jtuglu1 @kfaraz - curious if you have any thoughts here. |
|
|
||
| // If backfillTaskCount is not provided, default to taskCount / 2 | ||
| int taskCount = spec.getSpec().getIOConfig().getTaskCount(); | ||
| int numBackfillTasks = backfillTaskCount != null ? backfillTaskCount : Math.max(1, taskCount / 2); |
There was a problem hiding this comment.
[P1] Reject non-positive backfill task counts before resetting offsets
backfillTaskCount comes directly from the new query parameter and can be 0 or negative. In that case numTasks becomes non-positive, the code later divides by numTasks, catches the exception, and silently skips backfill submission; because resetSupervisorAndBackfill has already reset the supervisor metadata to latest, this can acknowledge a reset while leaving the skipped range un-backfilled. Validate backfillTaskCount > 0 before performing the reset.
For the initial implementation I've only done Kafka. I plan to implement Kinesis and Rabbit in subsequent PR's, followed by documentation for the new feature.
This change adds an endpoint called resetOffsetsAndBackfill to SupervisorResource. This is a useful feature for operating Druid clusters where the most recent data is the most important (such as alerting use cases).
Description
Adds an endpoint called resetOffsetsAndBackfill to automatically ingest skipped data in the case where the offset is reset to latest. This requires useEarliestOffset=false, useConcurrentLocks=true because there can be conflicting time intervals between the backfill task and the main supervisor tasks, useTransaction=false in order to disable metadata updates, and the Supervisor needs to be in a running state in order to call
updatePartitionLagFromStream()to get the latest offsetsIn addition, this change supports unsupervised SeekableStreamIndexTasks by setting
useTransaction: falsein the SeekableStreamIndexTaskIOConfig. The backfill tasks are one-off task submissions so the useTransaction flag was extended to disable any checkpointing (refer to changes in SeekableStreamIndexTaskRunner)Limitations
Since these tasks are unsupervised, in the event of task failures it would be a little tricky for operators to understand what offset the backfill task has consumed until, in order to submit a new backfill task. Some kind of configurable retry policy for these tasks could be a useful follow up
Release note
Adds an optional parameter to the Supervisor reset endpoint to backfill the skipped data when the stream is reset to latest
Key changed/added classes in this PR
SupervisorResourceSupervisorManagerSeekableStreamSupervisorKafkaSupervisorThis PR has: