Skip to content

Create dynamic config that can limit number of non-primary replicants loaded per coordination cycle#11135

Merged
capistrant merged 9 commits intoapache:masterfrom
capistrant:replica-load-limiter
May 5, 2021
Merged

Create dynamic config that can limit number of non-primary replicants loaded per coordination cycle#11135
capistrant merged 9 commits intoapache:masterfrom
capistrant:replica-load-limiter

Conversation

@capistrant
Copy link
Copy Markdown
Contributor

@capistrant capistrant commented Apr 19, 2021

Start Release Notes

Adds new Dynamic Coordinator Config maxNonPrimaryReplicantsToLoad with default value of Integer.MAX_VALUE. This configuration can be used to set a hard upper limit on the number of non-primary replicants that will be loaded in a single Druid Coordinator execution cycle. The default value will mimic the behavior that exists today.

Example usage: If you set this configuration to 1000, the Coordinator duty RunRules will load a maximum of 1000 non-primary replicants in each RunRules execution. Meaning if you ingested 2000 segments with a replication factor of 2, the coordinator would load 2000 primary replicants and 1000 non-primary replicants on the first RunRules execution. Then the next RunRules execution, the last 1000 non-primary replicants will be loaded.

End Release Notes

Description

Add a new dynamic configuration to the coordinator that gives an operator the power to set a hard limit for the number of non-primary segment replicas that are loaded during a single execution of RunRules#run. This allows the operator to limit the amount of work loading non-primary replicas that RunRules will execute in a single run. An example of a reason to use a non-default value for this new config is if the operator wants to ensure that major events such as historical service(s) leaving the cluster, large ingestion jobs, etc. do not cause an abnormally long RunRules execution compared to the cluster's baseline runtime.

Example

cluster: 3 historical servers in _default_tier with 18k segments per server. Each segment belongs to a datasource that has the load rule "LoadForever 2 replicas on _default_tier". The cluster load status is 100% loaded.

Event: 1 historical drops out of the cluster.

Today: The coordinator will load all 18k segments that are now under-replicated in a single execution of RunRules (as long as Throttling limits are not hit and there is capacity)

My change: The coordinator can load a limited number of these under-replicated segments IF the operator has tuned the new dynamic config down from its default. For instance, the operator could say that it is 2k. Meaning it would take at least 9 coordination cycles to fully replicate the segments that were on the recently downed host.

Why

Operators need to balance lots of competing needs. Having the cluster fully replicated is great for HA. But if an event causes the coordinator to take 20 minutes to fully replicate because it has to load thousands of replicas, we sacrifice the timeliness of loading newly ingested segments that were inserted into the metastore after this long coordination cycle started. Maybe the operator cares more about that fresh data timeliness than the replication status, so they change the new config to a value that causes RunRules to take less time but require more execution cycles to bring the data back to full replication.

Really what the change aims to do is give an operator more flexibility. As written the default would give the operator the exact same functionality that they see today.

Design

I folded this new configuration and feature into ReplicationThrottler. That is essentially what it is doing, just in a new way compared to the current ReplicationThrottler functionality.


Key changed/added classes in this PR
  • CoordinatorDynamicConfig
  • ReplicationThrottler
  • RunRules
  • LoadRule

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster.

@a2l007
Copy link
Copy Markdown
Contributor

a2l007 commented Apr 20, 2021

Thanks for the PR! This config should come in handy to reduce coordinator churn in case historicals fall out of the cluster. Have you thought about configuring maxNonPrimaryReplicantsToLoad specific to a tier instead of a global property?
Also could you please add some docs related to this property to the configuration docs?

@capistrant
Copy link
Copy Markdown
Contributor Author

Thanks for the PR! This config should come in handy to reduce coordinator churn in case historicals fall out of the cluster. Have you thought about configuring maxNonPrimaryReplicantsToLoad specific to a tier instead of a global property?
Also could you please add some docs related to this property to the configuration docs?

I added the missing docs.

I had not thought about making this a per-tier setting. I'm coming at it from the angle of an operator not caring if the non-primary replicants are in tier X, Y, or Z, but rather just wanting to make sure the coordinator never spends too much time loading these segments and not doing its other jobs, mainly discovering and loading newly ingested segments.

Copy link
Copy Markdown
Contributor

@a2l007 a2l007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I've left a couple of minor comments.
It's strange that Spell Check has flagged the word "replicants". We might have to add it to our .spelling file as well.

Comment thread docs/configuration/index.md Outdated
|`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70|
|`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false|
|`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false|
|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle.|`Integer.MAX_VALUE`|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful if we add some info regarding what could be a good starting value to set this to.

&& !paramsWithReplicationManager.getReplicationManager().isLoadPrimaryReplicantsOnly()
) {
log.info(
"Maximum number of non-primary replicants [%d] have been loaded for the current RunRules execution. Only loading primary replicants from here on.",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this behavior is valid only for the present coordinator run, the log message might be clearer with something like "Only loading primary replicants from here on for this coordinator run period"

Copy link
Copy Markdown
Contributor

@a2l007 a2l007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 after CI

@capistrant
Copy link
Copy Markdown
Contributor Author

https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java#L141

This PR has a similar issue that resulted in this block of code. I think I will do the same solution for now. but long term it would be cool if this had a more elegant solution.

this.pauseCoordination = pauseCoordination;
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;

if (maxNonPrimaryReplicantsToLoad == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider using 0 as an non-configured value and change the check here? That would avoid the primitive type change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think I would be ok with that. I don't see any valid use case where a value of 0 would be required by the user. At that point they would want to disable replication via load rules.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although doing this would kind of hide a user error. If they submit 0 but we change 0 to the default and log it, they wouldn't know 0 is invalid.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm fine with leaving it as Integer until we have a better solution in place to fix the dynamic config behavior during upgrade. It would be useful to log an issue for that behavior in case somebody would like to work on it.

@capistrant
Copy link
Copy Markdown
Contributor Author

@a2l007 are you okay with merge this week now that the issue for pursuing a cleaner configuration strategy is created?

@a2l007
Copy link
Copy Markdown
Contributor

a2l007 commented May 4, 2021

@capistrant Yup, LGTM. Thanks!

@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Aug 4, 2022

@capistrant , I was taking a look at the maxNonPrimaryReplicantsToLoad config but I couldn't really distinguish it from replicationThrottleLimit.

I see that you have made a similar observation here:

I folded this new configuration and feature into ReplicationThrottler. That is essentially what it is doing, just in a new way compared to the current ReplicationThrottler functionality.

Could you please help me understand the difference between the two? In which case would we want to tune this config rather than tuning the replicationThrottleLimit itself?

@capistrant
Copy link
Copy Markdown
Contributor Author

@capistrant , I was taking a look at the maxNonPrimaryReplicantsToLoad config but I couldn't really distinguish it from replicationThrottleLimit.

I see that you have made a similar observation here:

I folded this new configuration and feature into ReplicationThrottler. That is essentially what it is doing, just in a new way compared to the current ReplicationThrottler functionality.

Could you please help me understand the difference between the two? In which case would we want to tune this config rather than tuning the replicationThrottleLimit itself?

My observation is that maxNonPrimaryReplicantsToLoad is a new way of throttling replication. Not that it is doing the same thing as replicationThrottleLimit

replicationThrottleLimit is a limit on the number of in-progress replica loads at any one time during RunRules. We tack the in-progress loads in a list. Items are removed from said list when a LoadQueuePeon issues a callback to remove them on completion of the load.

maxNonPrimaryReplicantsToLoad is a hard limit on the number of replica loads during RunRules. Once it is hit, there is no more non-primary replicas created for the rest of RunRules.

You'd want to tune maxNonPrimaryReplicantsToLoad if you want to put an upper bound on the work to load non-primary replicas done by the coordinator per execution of RunRules. The reason we use it at my org is because we want the coordinator to avoid "putting it's head in the sand" and loading replicas for an un-desirable amount of time instead of finishing it's duties and refreshing its metadata. An example of an "un-desirable amount of work" is if a Historical drops out of the cluster momentarily while the Coordinator is refreshing its SegmentReplicantLookup. The coordinator all of a sudden thinks X segment are under-replicated. But if the Historical is coming back online (say after a restart to deploy new configs), we don't want the Coordinator to spin and load those X segments when it could just finish its duties and notice that the segments are not under-replicated anymore.

I'm not aware of reasons for using replicationThrottleLimit. It didn't meet my orgs needs for throttling replication and it is why I introduced the new config. I guess it is a way to avoid flooding the cluster with replica loads? My clusters have actually tuned that value up to avoid hitting it at the low default that exists. We don't care about the number of in-flight loads, we just care about limiting the total number of replica loads per RunRules execution.

Let me know if that clarification is still not making sense.

@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Aug 7, 2022

Thanks for the explanation, @capistrant !
I completely agree with your opinion that coordinator should not get stuck in a single run and should always keep moving, thereby refreshing its metadata snapshot. I suppose the other open PR from you is in the same vein.

I also think replicationThrottleLimit should probably have done this in the first place, as it was trying to solve the same problem that you describe. Putting the limit on the number of replica loads "currently in progress" is not a very good safeguard to achieve this.

Thanks for adding this config, as I am sure it must come in handy for proper coordinator management.

kfaraz added a commit that referenced this pull request Sep 4, 2023
Changes:

[A] Remove config `decommissioningMaxPercentOfMaxSegmentsToMove`
- It is a complicated config 😅 , 
- It is always desirable to prioritize move from decommissioning servers so that
they can be terminated quickly, so this should always be 100%
- It is already handled by `smartSegmentLoading` (enabled by default)

[B] Remove config `maxNonPrimaryReplicantsToLoad`
This was added in #11135 to address two requirements:
- Prevent coordinator runs from getting stuck assigning too many segments to historicals
- Prevent load of replicas from competing with load of unavailable segments

Both of these requirements are now already met thanks to:
- Round-robin segment assignment
- Prioritization in the new coordinator
- Modifications to `replicationThrottleLimit`
- `smartSegmentLoading` (enabled by default)
jakubmatyszewski pushed a commit to jakubmatyszewski/druid that referenced this pull request Sep 8, 2023
Changes:

[A] Remove config `decommissioningMaxPercentOfMaxSegmentsToMove`
- It is a complicated config 😅 , 
- It is always desirable to prioritize move from decommissioning servers so that
they can be terminated quickly, so this should always be 100%
- It is already handled by `smartSegmentLoading` (enabled by default)

[B] Remove config `maxNonPrimaryReplicantsToLoad`
This was added in apache#11135 to address two requirements:
- Prevent coordinator runs from getting stuck assigning too many segments to historicals
- Prevent load of replicas from competing with load of unavailable segments

Both of these requirements are now already met thanks to:
- Round-robin segment assignment
- Prioritization in the new coordinator
- Modifications to `replicationThrottleLimit`
- `smartSegmentLoading` (enabled by default)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants