Skip to content

Continuous automatic compaction#13852

Closed
suneet-s wants to merge 5 commits intoapache:masterfrom
suneet-s:demo-continuous-compact
Closed

Continuous automatic compaction#13852
suneet-s wants to merge 5 commits intoapache:masterfrom
suneet-s:demo-continuous-compact

Conversation

@suneet-s
Copy link
Copy Markdown
Contributor

@suneet-s suneet-s commented Feb 27, 2023

Description

This change introduces the ability to have auto-compaction continuously
schedule compaction tasks as slots become available.

Previously, each run of the CompactSegments duty built an iterator based on
the latest segment metadata available. This meant that if the compact tasks
that were scheduled ran into any issues, like task lock contention, or an
interval which can not be compacted because of a bug, auto-compaction would
be stuck on the cluster.

With this change, CompactSegments, refreshes it's view of the segments based
on the druid.coordinator.compaction.searchPolicyRefreshPeriod property.
This allows auto-compaction to continue to make progress if any interval
fails to compact until the search policy is refreshed.

Compaction statistics for the cluster are only refreshed when the search
policy is refreshed. This is because to collect statistics, the task has
to run through the entire list of available segments on the cluster which
can take a long time on large clusters

To enable this behavior on the cluster, add something like this to the
coordinator runtime properties

druid.coordinator.dutyGroups=["compaction"]
druid.coordinator.compaction.duties=["compactSegments"]
druid.coordinator.compaction.period=PT20S

Release note

New: You can now schedule automatic compaction to run continuously and configure
how frequently it should consider new segments for compaction


High level implementation

CompactSegments remains the duty that performs auto-compaction related
processing. The period at which the CompactionSegmentSearchPolicy is reset
is de-coupled from the CompactSegments run duty, allowing CompactSegments
to run independently of any concerns from the segment metadata refresh.

Other implementation considerations
  • NewestSegmentFirstSearchPolicy#resetIfNeeded - To implement this, I had explored
    using a LoadingCache and a MemoizingSupplierWithExpiration, unfortunately neither
    of these options seemed to work well because of the need to know when the policy was
    reset so that compaction stats could be computed.
  • New CompactSegment duties vs re-using the existing duty: The existing duty code is dense
    and seemed risky to refactor since auto-compaction is a widely adopted feature. I explored the
    idea of deprecating the existing duty and introducing 2 new duties - one to reset the policy and
    another to read from the iterator. I abandoned this approach because of the risk of refactoring
    and because choosing how to wire up the duties seemed more complicated in this model.
Key changed/added classes in this PR
  • CompactionSegmentSearchPolicy#resetIfNeeded
  • CompactSegments#makeStats calculates stats without iterating over the iterator.
    This is done so that other calls to CompactSegments can continue to make progress
    schedule other intervals to be compacted without needing to reset the policy.

Future changes

  • With this change an operator still needs to place the CompactSegments duty
    in a custom duty group with a faster period than the Indexing duty period
    to take advantage of this change.
    A future change should change this so that auto-compaction is scheduled
    to run continuously by default.
  • A new API should be introduced to enable a cluster operator to force reset
    the CompactionSegmentSearchPolicy at the next CompactSegments run. This will
    be useful in demo situations, or if a large ingestion has just taken place
    and the cluster operator wants to get auto-compaction to start processing these
    segments immediately.

This change is built on top of #13842

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@suneet-s suneet-s requested a review from maytasm February 27, 2023 00:45
This change introduces the ability to have auto-compaction continuously
schedule compaction tasks as slots become available.

Previously, each run of the CompactSegments duty built an iterator based on
the latest segment metadata available. This meant that if the compact tasks
that were scheduled ran into any issues, like task lock contention, or an
interval which can not be compacted because of a bug, auto-compaction would
be stuck on the cluster.

With this change, CompactSegments, refreshes it's view of the segments based
on the `druid.coordinator.compaction.searchPolicyRefreshPeriod` property.
This allows auto-compaction to continue to make progress if any interval
fails to compact until the search policy is refreshed.

Compaction statistics for the cluster are only refreshed when the search
policy is refreshed. This is because to collect statistics, the task has
to run through the entire list of available segments on the cluster which
can take a long time on large clusters

To enable this behavior on the cluster, add something like this to the
coordinator runtime properties

```
druid.coordinator.dutyGroups=["compaction"]
druid.coordinator.compaction.duties=["compactSegments"]
druid.coordinator.compaction.period=PT60S
```

Key changes
* CompactionSegmentSearchPolicy#resetIfNeeded
* CompactSegments#makeStats

Future changes:
- With this change an operator still needs to place the CompactSegments duty
in a custom duty group with a faster period than the Indexing duty period
to take advantage of this change.
A future change should change this so that auto-compaction is scheduled
to run continuously by default.
- A new API should be introduced to enable a cluster operator to force reset
the CompactionSegmentSearchPolicy at the next CompactSegments run. This will
be useful in demo situations, or if a large ingestion has just taken place
and the cluster operator wants to get auto-compaction to start processing these
segments immediately.
@suneet-s suneet-s force-pushed the demo-continuous-compact branch from 1b70324 to 23ea578 Compare February 27, 2023 16:00
@suneet-s suneet-s marked this pull request as ready for review February 27, 2023 16:01
Comment on lines +140 to +141
@Config("druid.coordinator.compaction.searchPolicyRefreshPeriod")
@Default("PT5M")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, why here instead of on the CompactSegments duty? I guess that would make the property druid.coordinator.compaction.duty.searchPolicyRefreshPeriod instead

side note, coordinator config in general seems really complicated 😅 I had to read a bunch of code to understand how custom duties work and get wired up to stuff... and its kind of strange.

I guess where I am getting at is that it seems like having this refresh period be more frequent than the duty period seems like it would be an incorrect configuration (or at least useless since it would always reset), but I'm not entirely sure how such a check could actually be wired up. Maybe if the duty period was added to the properties that get injected so the compaction duty could pick it up or something?

There is also CoordinatorCompactionConfig to add to the confusion... not to mention druid.coordinator.kill.compaction.period which does live here...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had considered placing the config in the CoordinatorCompactionConfig instead, but decided against it because that config is dynamic, but changing that value would not necessarily update the expiry time of the iterator, so this seemed like a better UX

druid.coordinator.compaction. searchPolicyRefreshPeriod could use a better name. It's intent is to provide operators with the ability to choose when to re-build the view of the segment interval timeline to be considered for compaction. Today, every time the compact segments duty is run, the view is re-built. So the property does not need to be tied to the CompactSegments duty

Copy link
Copy Markdown
Member

@clintropolis clintropolis Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah the name doesn't really bother me, it was more of a general comment about the config of this stuff being all over the place and that I have no idea where something should actually live. Given that, I think here is probably as fine as anywhere else 😛

@imply-cheddar
Copy link
Copy Markdown
Contributor

imply-cheddar commented Mar 1, 2023

Another way you could achieve the intended outcome and, I think, have a bit more straightforward code (at least, you wouldn't need any new configs) would be to introduce a new thread. Basically, continue to use the duty to figure out which time chunks should be compacted by looking at the segment list/whatever. The duty populates the state of some object, let's call it a CompactionJobQueue. The new thread that you create reads from the CompactionJobQueue and submits jobs until it fills up task slots. If the next job would consume too many slots, it waits for jobs to finish before submitting the next one.

In this way, the Duty can change the state of CompactionJobQueue whenever it thinks that it should be changed and the extra thread can submit jobs and wait for their completion regardless of the duty's scheduled runtime. If I'm understanding your code in this PR correctly, my suggestion is not too far away, it's just more straightforward in terms of what is doing what. I will admit that I do not fully understand how the code in this PR is achieving continuous running of jobs, but I assume it's using the same logical idea as the shared Queue object and then some cleverness somewhere to make things actually run.

Additionally, you have no tests that validate that jobs will continue to run even when the duty hasn't run yet. The CoordinatorSimulation is intended to allow you to create tests that simulate states of clusters. Make sure to add tests that validate the behaviors you are looking for.

ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), config.getMetricsSpec() != null),
ClientCompactionTaskQueryTuningConfig.from(
config.getTuningConfig(),
config.getMaxRowsPerSegment(),

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [DataSourceCompactionConfig.getMaxRowsPerSegment](1) should be avoided because it has been deprecated.
@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Mar 1, 2023

Thanks a lot for taking this up, @suneet-s !

I had a similar approach in mind to what @imply-cheddar has described.
@suneet-s , I think you too have mentioned this as a design that you considered, breaking up the CompactSegments duty into two.

I feel it would be a cleaner approach and would avoid the need to add a new config.

  • Keep the CompactSegments duty mostly unchanged
    • it would run at the custom period, if specified, otherwise, the default indexingPeriod (30 min)
    • it continues to call policy.reset() to create a new iterator
    • pass the CompactionJobQueue to this duty and update it with the new iterator
    • Optional: Rename this duty to reflect that it just identifies compactible intervals (maybe GetIntervalsForCompaction or something) and does not actually invoke compaction.
  • Add a new duty QueueCompactionTasks
    • This runs as frequently as other coordinator duties (maybe alongwith historical management duties, i.e. every 1 min or so).
    • Avoids the need of having a separate executor/thread and is frequent enough to make the best use of available compaction task slots.
    • Asks the CompactionJobQueue for the next compaction tasks to queue and sends them to the Overlord
    • For the most part, this translates to moving the CompactSegments.doRun() method to this new duty
  • Stats computation
    • It can remain in CompactSegments if it is a costly operation
    • If required, we could even choose to expand CompactionJobQueue to be a ClusterCompactionState which could accumulate stats and snapshots as and when they are built and report them when necessary.

The other advantages of this approach are that users can benefit from it even if they don't specify a custom period for CompactSegments. (In the future, it might even help us do away with having CompactSegments as a custom duty, which is a little weird since compaction is a core feature of the ingestion system.)

@suneet-s
Copy link
Copy Markdown
Contributor Author

suneet-s commented Mar 1, 2023

Thanks for the reviews @imply-cheddar and @kfaraz

Another way you could achieve the intended outcome and, I think, have a bit more straightforward code (at least, you wouldn't need any new configs) would be to introduce a new thread.

I described this approach in the alternatives considered section of the PR. It does clean up the code a little bit (primarily that CompactionSegmentSearchPolicy#resetIfNeeded doesn't need to return a pair so callers know when the iterator was reset). However, we still end up with 2 configs - one for the period at which the iterator should be reset and another for the period at which compact tasks should be scheduled. The other reason I did not go with this approach is to preserve compatibility for users currently using the CompactSegments duty as a custom duty. Trying to disable the old way of running auto-compaction and enable 2 new duties to run it would make things more error prone imo.

The CoordinatorSimulation is intended to allow you to create tests that simulate states of clusters.

Thanks for this. I did not know about this class, I will add some tests here.

The other advantages of this approach are that users can benefit from it even if they don't specify a custom period for CompactSegments.

I have purposefully implemented this change in a way where there should be no user visible change yet. I would like to be able to enable this on a few production clusters and test this in real world scenarios before Druid users are opted in to this by default. A future patch should change the defaults so users benefit from this even if they make no configuration changes.

@suneet-s
Copy link
Copy Markdown
Contributor Author

suneet-s commented Mar 1, 2023

I really want to be able to start iterating on this and pushing up more patches to expand on the functionality. Other than the comment about tests (which i strongly agree with and can address shortly) are any of these comments meant to block the PR from being merged?

I haven't added docs to this PR, because I don't want users to know about this capability yet. Some things like the config might change in future patches (based on the feedback in this PR and using it in real production clusters), but I believe as written, there is no user visible change to users unless they know about this setting.

@imply-cheddar
Copy link
Copy Markdown
Contributor

This change does not deserve a new config and we cannot merge it in a way that requires a new config. If we separate out to a queue that is read, then the Duty stays the same and populates that queue, the thread just runs stuff based on whatever is in that queue. It does not need an extra config and does not need any extra thought by end users. It will immediately benefit all deployments. Adding more configs is a very expensive operation and adding a new config that is hard to understand (why would we be setting the reset longer than something else?) is even worse, especially for functionality that will benefit all deployments and that we want on for everybody.

Please fix the PR to remove all new configs and make it just operate by having the duty mutate a queue (each time it wakes up) and have a new thread that reads from that queue and submits jobs in accordance with whatever configurations are set in terms of how many task slots can be used.

As it is, I'm -1 on this PR until these changes are made.

@paul-rogers
Copy link
Copy Markdown
Contributor

I’m a newbie to this code so I took a deep dive to understand how it works. The compaction scheduling algorithm is rather crude: it isn’t well designed for a large, busy, distributed system. @suneet-s ’s fix is a worth-while short term fix, but this code really needs a redesign along the lines outlined by @imply-cheddar and @kfaraz.

For anyone else new to this area, here’s an overview of the logic. Please forgive any newbie misunderstandings. At the highest level:

  • The coordinator provides a set of duties which are called one after another on a schedule.
  • Duties have Guice-injected global state, but no local state across calls.
  • Each duty is called from the main duty loop and thus must complete quickly.
  • The compaction scheduler is a duty. As such, each call is independent from any previous call, and the scheduler must make its decisions quickly.

The challenge, then, is to design the compaction scheduler to be stateless and fast. On each call it:

  • Determines the number of available slots, accounting for running tasks.
  • Creates a big iterator over all datasources with compaction specs and their segments sorted by time chunk, minus any segments locked by existing compaction or ingestion tasks.
  • Launches a set of compaction tasks from the head of the iterator until slots are full.

In short, each call to the scheduler duty builds a large list of candidate actions: “an iterator based on the latest segment metadata available”. The scheduler then starts working down that list from the head, stopping when the worker slots are filled. If “the compact tasks that were scheduled ran into any issues, like task lock contention, or an interval which can not be compacted because of a bug” then the next invocation of scheduler will launch new tasks for those same datasources and time chunks. It will do so because the scheduler maintains no state that would tell it that those very time chunks just failed. The new tasks may also fail. The result is “auto-compaction would be stuck on the cluster.”

The design works well if the number of datasource and segments is small relative to the number of available compaction tasks. That assumption is not valid on larger systems.

The design also assumes that new ingestion tasks will seldom cancel compaction tasks, hence a start-from-the-top approach will make progress. That assumption is not valid on a system with late-arriving data. Yet, late-arriving data is a fact of life in many systems.

Again, this code needs a redesign. But, that is a major project. So, what short-term solution can we apply instead?

The obvious solution is to allow the list (the iterator) to persist, and to have the scheduler work its way through the entire iterator before starting over again. Doing so requires the scheduler to maintain state.

Suneet's fix places the iterator state not on the scheduler itself, but rather on a CompactionSegmentSearchPolicy instance, which is a bit of a back-door solution: the coordinator duty does not maintain state, but a Guice-injected global dependency does.

The original code built the iterator anew on each scheduler invocation. The new code hedges its bets: it does not build the iterator on each call, but rather after some amount of time. There is much discussion about the configuration of this refresh period. No doubt it would be quite hard for anyone to come up with a good number.

Perhaps one simple solution is to omit the refresh period. Instead, the scheduler works its way through one entire list (iterator) before looping back to the start. Let's call this "one cycle". The full-cycle approach ensures all time chunks have a shot at compaction. If a task fails during one cycle, that chunk will be tried again in the next cycle, perhaps after any active ingestion has completed. Failures in compaction will occur once per cycle, not once per scheduler invocation. Compaction will no longer become "stuck."

A risk, of course, is that the information in the iterator becomes stale. The system is distributed: it should already handle that case: race conditions are to be expected. Another risk is that new data arrives faster than a cycle can run. In this case, no algorithm can solve the problem: more compaction resources are the only answer.

So, one question is, would the full-cycle idea work in practice?

The current PR is clearly an improvement. A redesign, as outlined by @imply-cheddar and @kfaraz would be a project, made more of a challenge because tests in this area appear to be sparse. The code is essentially untestable without a cluster because it is non-modular and is tightly coupled with other parts of the system. So, a second question is: can we accept this short-term fix or would we rather wait indefinitely for someone to tackle the sorely-needed redesign?

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Feb 8, 2024

This pull request has been marked as stale due to 60 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If you think
that's incorrect or this pull request should instead be reviewed, please simply
write any comment. Even if closed, you can still revive the PR at any time or
discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

@github-actions github-actions Bot added the stale label Feb 8, 2024
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Mar 8, 2024

This pull request/issue has been closed due to lack of activity. If you think that
is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions Bot closed this Mar 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants