You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently, there is no segment balancing/loading algorithm. There is just code that happens to behave somehow, likely that nobody understands, how. It leads to incoherent/repeated configurations (#7159) and defects (#7202, #7383, #6329). It should also be possible to significantly speed up balancing/loading by not waiting until all segment movements from the previous balancing burst are done (#7159, #7344).
The goal of this proposal is to establish terminology, introduce a coherent and more operationally-friendly set of configuration parameters, and define the balancing/loading algorithm.
Terminology
First, let's establish terminology.
There are two groups of segments:
Replicated segments: governed by LoadRules.
Broadcasted segments: governed by BroadcastDistributionRules.
Unreplicated segments are such replicated segments that should but are not currently loaded by any historical server.
Underreplicated segments (in a tier) are such replicated segments whose target replication count (replicants) in the tier is higher than the number of historical servers in the tier loading the segment at the moment. In the algorithm below, unreplicated and underreplicated (or fully replicated) segments are considered disjoint groups: unreplicated segments are not called underreplicated in each tier with zero currently loading servers. But since a segment is loaded by at least one server in any tier it becomes underreplicated in all other tiers where it should be loaded.
Fully replicated segments (in a tier) are such replicated segments whose target replication count in the tier is equal to the number of historical servers in the tier loading the segment at the moment.
Overreplicated segments (in a tier) are such replicated segments whose target replication count in the tier is lower than the number of historical servers in the tier loading the segment at the moment.
Underbroadcasted segments (in a tier) are such broadcasted segments that are not loaded on some servers where they should be loaded. These servers should include decommissioning servers, except for the decommissioning servers that only hold broadcasted segments at the moment (see #7383).
Configurations
Existing configuration, still used
maxSegmentsInNodeLoadingQueue.
New configurations
numBestServersChoosingFunction, a JavaScript function. Defaults to function (numServersInTier, targetReplicants) { return targetReplicants + Math.floor(Math.log(numServersInTier) / Math.log(numBestServersChoosingFunctionDefaultLogBase)); }
numBestServersChoosingFunctionDefaultLogBase, defaults to 5. Determines the specific shape of the default numBestServersChoosingFunction which determines the number of best servers (according to CostBalancerStrategy) in a tier to consider loading a segment on. The single best server may be unavailable for loading if its loading queue is already full (with many different definitions of "full" which will be explained below). In the current algorithm, numBestServersChoosingFunction is implicitly return targetReplicants; (if we were to define it), in other words, we always consider only targetReplicants "best" servers in a tier for loading.
numBestServersChoosingFunction (or numBestServersChoosingFunctionDefaultLogBase if used to adjust the default function) is the single parameter allowing to balance between the loading speed and the precision: a higher number allows higher balancing thoughput, but balancing decisions may be less optimal.
With the default function equivalent to return targetReplicants + Math.floor(Math.log(numServersInTier) / Math.log(5));, the behavior will be as "precise" as the current behavior in tiers of less than 5 servers, +1 server "imprecision allowance" in tiers between 6 and 25 servers, +2 servers in tiers between 26 and 125 servers, etc.
loadingMinPercent, defaults to 70.
decommissioningMinPercent, defaults to 10.
decommissioningMaxPercent, defaults to 70, or decommissioningMaxPercentOfMaxSegmentsToMove if this legacy parameter is specified.
balancingMinPercent, defaults to 20.
There are no "loadingMaxPercent" and "balancingMaxPercent" because there is no reason to cap loading and balancing (as long as all "minPercents" are satisfied).
loadingMinPercent + decommissioningMinPercent + balancingMinPercent must be equal to 100. If only one of these three parameters is specified, the other two are determined by applying their default proportions to 100 - theSpecifiedMinPercent. If two of these three parameters are specified, the third one is determined by complementing to 100.
These are "minPercents" and "maxPercents" in maxSegmentsInNodeLoadingQueue, not maxSegmentsToMove. This is not reflected in the names of the parameters to avoid over specifying the algorithm (like for decommissioningMaxPercentOfMaxSegmentsToMove, the parameter name should become obsolete very soon after the introduction).
segmentLoadingMaxSecondsBeforeAlerting, defaults to 900 (15 minutes), or replicantLifetime * druid.coordinator.period if the legacy replicantLifetime parameter is specified.
Retired configurations
maxSegmentsToMove
replicationThrottleLimit,
decommissioningMaxPercentOfMaxSegmentsToMove, unless used to default decommissioningMaxPercent for backward compatibility.
replicantLifetime, unless used to default segmentLoadingMaxSecondsBeforeAlerting for backward compatibility.
Proposed algorithm
1. Unreplicated segments
A "loading" stage.
Use CostBalancerStrategy to determine top B "best" servers in every tier where the segment should be loaded, excluding decommissioning servers, where B is determined by numBestServersChoosingFunction (in each tier differently).
Schedule loading of the segment on one server in the primary tier.
Limit: on this stage, loading queue of any server can't contain more than maxSegmentsInNodeLoadingQueue * loadingMinPercent segments that were scheduled to be loaded on the server during one of the "loading" stages of the algorithm, including this stage. This may include segment loading requests hanging since the previous Coordinator's run (see #7159). Below, this kind of "typed" node loading queue limit is shortly identified as (relative).
If a segment can't be loaded on any server in the "primary" tier because loading queues of all B "best" servers are already sufficiently full with "loading" requests, step 3. is repeated for non-primary tiers.
After this stage, unreplicated segments are split into the following groups:
ToBecomeFullyReplicated (per tier)
ToBecomeUnderreplicated (per tier)
StillUnreplicated
2. Underbroadcasted segments
A "loading" stage.
In every tier where the segment is underbroadcasted, schedule loading of the segment on all servers in a tier where the segment is not currently loaded.
Limit: loadingMinPercent (relative).
After this stage, underbroadcasted segments are split into the following groups:
ToBecomeFullyBroadcasted (per tier)
StillUnderbroadcasted (per tier)
3. Underreplicated segments
A "loading" stage.
The following steps are performed in every tier where a segment is underreplicated:
Determine top B "best" servers in the tier for the segment.
Schedule loading of the segment on as many servers (among those determined on the previous step) as needed to match the target replication count in the tier. If the segment is also overreplicated in some other tier, add a callback to schedule unloading the segment from one of the servers where it is currently loaded in the other tier, beyond the target replication count in that tier, upon successful loading in the tier where the segment is underreplicated. (If there are more than one server where the segment is loaded in the tier where it is overreplicated and the target replication count in that tier is greater than zero, determining the server to unload the segment from may require to consult to CostBalancerStrategy.)
Limit: loadingMinPercent (relative).
After this stage, underreplicated segments are split into the following groups:
ToBecomeFullyReplicated (per tier). These per-tier groups are initialized during stage 1, segments are added to the existing groups.
StillUnderreplicated (per tier)
4. ToBecomeUnderreplicated segments
A "loading" stage.
In every tier where a segment is underreplicated, schedule loading of the segment on as many servers (among the top B "best" servers determined for the segment during stage 1) as needed to match the target replication count in the tier.
Limit: loadingMinPercent (relative).
After this stage, ToBecomeUnderreplicated segments are split into the following groups:
ToBecomeFullyReplicated (per tier). These per-tier groups are initialized during stage 1, segments are added to the existing groups.
ToBecomeUnderreplicated (per tier). In other words, segments that weren't scheduled for loading on enough servers in the tier due to the loadingMinPercent limit remain in the group where they were at the beginning of this stage.
5. Move replicated segments away from decommissioning servers
A "decommissioning" stage.
The following steps are performed in every tier, for every replicated segment loaded on one of the decommissioning servers in the tier:
Determine top B "best" servers in the tier for the segment, or reuse already computed top B servers during stage 3 if the segment is underreplicated in the tier.
Schedule loading of the segment on the servers determined on the previous step. Add a callback to schedule unloading the segment from one of the decommissioning servers upon successful loading. (This type of operation is currently implemented in DruidCoordinator.moveSegment().)
Limit: decommissioningMinPercent (relative).
After this stage, ToMoveAwayFromDecommServers segments are split into the following groups:
ToBecomeNotLoadedOnDecommServers (per tier)
StillLoadedOnDecommServers (per tier)
6. First revisit of loading segments
A "loading" stage.
Repeat:
Stage 1 for StillUnreplicated segments,
Stage 2 for StillUnderbroadcasted segments,
Stage 3 for StillUnderreplicated segments,
Stage 4 for ToBecomeUnderreplicated segments,
With the difference that the computed top B "best" servers are reused from the previous stages, and with a different limit (otherwise this stage wouldn't make much sense):
Limit: on this stage, loading queue of any server can't contain more than maxSegmentsInNodeLoadingQueue * (loadingMinPercent + decommissioningMinPercent) segments. This may include segment loading requests hanging since the previous Coordinator's run.
7. Segment balancing
A "balancing" stage.
There is a loop performed for every tier in the cluster. Steps of the loop:
Choose a segment randomly among fully replicated and underreplicated segments (as of the beginning of the algorithm run) that need to be loaded in the tier.
If the chosen segment has already been visited during this loop for the tier, mark this iteration as "failed" and proceed to the next iteration.
Determine top B "best" servers in the tier for the segment, or reuse already computed top B servers during stage 3 or 5.
If the segment shouldn't be moved because the servers where it is currently loaded are all among the top B "best" servers for the segment, mark this iteration as "failed" and proceed to the next iteration.
Alternative design: we may decide that a segment movement is worthwhile if there is any server where the segment is not loaded (among the top B) "better" than any server where the segment is currently loaded, rather than only if one of the servers, where the segment is currently loaded, is out of the top B "best" servers. However, avoid making "too precise" balancing movements like moving a segment from the server ranked second to the server ranked first by cost (according to CostBalancerStrategy) leaves more room for segment movements of relatively higher importance, before the loading queues of the servers are sufficiently filled up.
5. If the segment can't be moved because the loading queues of all destination servers (those among the top B, while some servers where the segment is currently loaded are out of the top B) are already sufficiently full (balancingMinPercent (relative)), add the segment to the ToBeBalanced group (for the tier), mark the iteration as "failed", and proceed the the next iteration.
6. Mark the iteration as "successful" and schedule a segment movement (like on stage 5, step 2).
The loop exits when among 100 last iterations, at least 80 of them are "failed".
Limit: balancingMinPercent (relative).
Comment: the 80/100 "failed" iterations condition replaces the static bound maxSegmentsToMove. Its advantage is that this condition is independent of the tier size. It should work for all tiers from containing a single server (then all 100 first iterations will fail and the loop will exit) to hundreds of servers. I don't know if 80 is the good threshold (and if it should be different in tiers of different sizes) because we don't have #5987 yet running in our clusters so we don't have good visibility into real "moved" and "unmoved" counts now. I need some input from the community to determine that.
8. Second revisit of loading segments
A "loading" stage.
Repeat stage 6, with the difference that loading queues of all servers can contain up to maxSegmentsInNodeLoadingQueue segments on this stage. This limit is also implicitly in place during all previous stages. This type of limit is shortly identified as full maxSegmentsInNodeLoadingQueue below.
9. StillLoadedOnDecommServers segments
A "decommissioning" stage.
Repeat stage 5 for StillLoadedOnDecommServers segments and with a different limit: decommissioningMaxPercent (relative).
10. ToBeBalanced segments
A "balancing" stage.
In every tier, for every ToBeBalanced segment in that tier, schedule as many as possible movements of the segment from the servers where the segment is currently loaded that are out of the top B best servers in the tier for the segment to one of the servers within the top B.
Limit: full maxSegmentsInNodeLoadingQueue.
11. Repeat segment balancing
Repeat stage 7 with a different limit: full maxSegmentsInNodeLoadingQueue. A segment is considered already visited on step 2 if it was visited during stage 7. Also, during this stage, segments don't need to be added to the ToBeBalanced group.
12. Unload excessively overreplicated segments
For every segment, if the total replication count in all tiers exceeds the sum of target replication counts in all tiers, schedule unloading of the segment from as many servers in the tiers where the segment is overreplicated as needed to match the total replication count with the sum of target replication counts. If there is more than one server where the segment is loaded in any tier where it is overreplicated and the target replication count in that tier is greater than zero, determining the servers to unload the segment from may require to consult to CostBalancerStrategy.
Note that the replicated segments to which one of DropRules currently applies should be unloaded during this stage.
"Non-excessively overreplicated" segments should be scheduled for unloaded in callbacks during stage 3.
13. Unload broadcasted segments from decommissioning servers
For every decommissioning server that is currently holding only broadcasted segments, schedule unloading all these segments from the server.
Rationale
Compared to the current behavior of Coordinator, the proposed algorithm prioritizes availability as much as possible in the face of abrupt changes in the cluster, for example, when a lot of Historical nodes become unavailable and a lot of segments become unreplicated because of that. During stage 1, only a single replica for every unreplicated segment is scheduled for loading, giving a chance for other unreplicated segments to be scheduled for loading before loading queues of all "good" servers are filled up.
Note that top B "best" servers are reused between the stages without recomputing, although some loading decisions that happen in between may affect the scores of the servers. This is done not to repeat expensive computations many times in CostBalancerStrategy for every segment: in the proposed algorithm, a segment may be visited up to five times during different stages in a context of a single tier (an underreplicated segment can be visited during stages 3, 6, 7, 8, and 10). I think this shouldn't affect the quality of cost-based balancing much, however, this opinion is not grounded in anything.
Implementation notes
I don't have a firm understanding or confidence in what I write below in this section because I didn't advance much in the implementation yet.
To handle many different groups of segments (up to dozens, depending on the number of tiers in the cluster) I plan to use bitsets. The index corresponds to the position of the segment in the DruidCoordinatorRuntimeParams.availableSegments (to be renamed into usedSegments in #7306). To allow effective indexing, this data structure is turned into a sorted array instead of TreeSet. set.contains()-like operations are implemented via Arrays.binarySearch(). (This is pretty much how Guava's ImmutableSortedSet works.)
LoadQueuePeon implementations should internally handle three different types of segment loading requests: "loading", "decommissioning", and "balancing".
As I mentioned in #7159, SegmentReplicantLookup should become a concurrent data structure.
Operational impact
I think the new algorithm should become the default immediately, but I'll keep the old version of the code and provide an option to switch to the old implementation if the new algorithm will fail in production for some unforeseen reasons or bugs.
It doesn't seem reasonable to me to keep the old implementation around for more than one Druid release.
Motivation and goal
Currently, there is no segment balancing/loading algorithm. There is just code that happens to behave somehow, likely that nobody understands, how. It leads to incoherent/repeated configurations (#7159) and defects (#7202, #7383, #6329). It should also be possible to significantly speed up balancing/loading by not waiting until all segment movements from the previous balancing burst are done (#7159, #7344).
The goal of this proposal is to establish terminology, introduce a coherent and more operationally-friendly set of configuration parameters, and define the balancing/loading algorithm.
Terminology
First, let's establish terminology.
There are two groups of segments:
Unreplicated segments are such replicated segments that should but are not currently loaded by any historical server.
Underreplicated segments (in a tier) are such replicated segments whose target replication count (replicants) in the tier is higher than the number of historical servers in the tier loading the segment at the moment. In the algorithm below, unreplicated and underreplicated (or fully replicated) segments are considered disjoint groups: unreplicated segments are not called underreplicated in each tier with zero currently loading servers. But since a segment is loaded by at least one server in any tier it becomes underreplicated in all other tiers where it should be loaded.
Fully replicated segments (in a tier) are such replicated segments whose target replication count in the tier is equal to the number of historical servers in the tier loading the segment at the moment.
Overreplicated segments (in a tier) are such replicated segments whose target replication count in the tier is lower than the number of historical servers in the tier loading the segment at the moment.
Underbroadcasted segments (in a tier) are such broadcasted segments that are not loaded on some servers where they should be loaded. These servers should include decommissioning servers, except for the decommissioning servers that only hold broadcasted segments at the moment (see #7383).
Configurations
Existing configuration, still used
maxSegmentsInNodeLoadingQueue.New configurations
numBestServersChoosingFunction, a JavaScript function. Defaults tofunction (numServersInTier, targetReplicants) { return targetReplicants + Math.floor(Math.log(numServersInTier) / Math.log(numBestServersChoosingFunctionDefaultLogBase)); }numBestServersChoosingFunctionDefaultLogBase, defaults to 5. Determines the specific shape of the defaultnumBestServersChoosingFunctionwhich determines the number of best servers (according toCostBalancerStrategy) in a tier to consider loading a segment on. The single best server may be unavailable for loading if its loading queue is already full (with many different definitions of "full" which will be explained below). In the current algorithm,numBestServersChoosingFunctionis implicitlyreturn targetReplicants;(if we were to define it), in other words, we always consider onlytargetReplicants"best" servers in a tier for loading.numBestServersChoosingFunction(ornumBestServersChoosingFunctionDefaultLogBaseif used to adjust the default function) is the single parameter allowing to balance between the loading speed and the precision: a higher number allows higher balancing thoughput, but balancing decisions may be less optimal.With the default function equivalent to
return targetReplicants + Math.floor(Math.log(numServersInTier) / Math.log(5));, the behavior will be as "precise" as the current behavior in tiers of less than 5 servers, +1 server "imprecision allowance" in tiers between 6 and 25 servers, +2 servers in tiers between 26 and 125 servers, etc.loadingMinPercent, defaults to 70.decommissioningMinPercent, defaults to 10.decommissioningMaxPercent, defaults to 70, ordecommissioningMaxPercentOfMaxSegmentsToMoveif this legacy parameter is specified.balancingMinPercent, defaults to 20.There are no "loadingMaxPercent" and "balancingMaxPercent" because there is no reason to cap loading and balancing (as long as all "minPercents" are satisfied).
loadingMinPercent+decommissioningMinPercent+balancingMinPercentmust be equal to 100. If only one of these three parameters is specified, the other two are determined by applying their default proportions to100 - theSpecifiedMinPercent. If two of these three parameters are specified, the third one is determined by complementing to 100.These are "minPercents" and "maxPercents" in
maxSegmentsInNodeLoadingQueue, notmaxSegmentsToMove. This is not reflected in the names of the parameters to avoid over specifying the algorithm (like fordecommissioningMaxPercentOfMaxSegmentsToMove, the parameter name should become obsolete very soon after the introduction).segmentLoadingMaxSecondsBeforeAlerting, defaults to 900 (15 minutes), orreplicantLifetime*druid.coordinator.periodif the legacyreplicantLifetimeparameter is specified.Retired configurations
maxSegmentsToMovereplicationThrottleLimit,decommissioningMaxPercentOfMaxSegmentsToMove, unless used to defaultdecommissioningMaxPercentfor backward compatibility.replicantLifetime, unless used to defaultsegmentLoadingMaxSecondsBeforeAlertingfor backward compatibility.Proposed algorithm
1. Unreplicated segments
A "loading" stage.
CostBalancerStrategyto determine topB"best" servers in every tier where the segment should be loaded, excluding decommissioning servers, whereBis determined bynumBestServersChoosingFunction(in each tier differently).Limit: on this stage, loading queue of any server can't contain more than
maxSegmentsInNodeLoadingQueue*loadingMinPercentsegments that were scheduled to be loaded on the server during one of the "loading" stages of the algorithm, including this stage. This may include segment loading requests hanging since the previous Coordinator's run (see #7159). Below, this kind of "typed" node loading queue limit is shortly identified as (relative).If a segment can't be loaded on any server in the "primary" tier because loading queues of all
B"best" servers are already sufficiently full with "loading" requests, step 3. is repeated for non-primary tiers.After this stage, unreplicated segments are split into the following groups:
2. Underbroadcasted segments
A "loading" stage.
In every tier where the segment is underbroadcasted, schedule loading of the segment on all servers in a tier where the segment is not currently loaded.
Limit:
loadingMinPercent(relative).After this stage, underbroadcasted segments are split into the following groups:
3. Underreplicated segments
A "loading" stage.
The following steps are performed in every tier where a segment is underreplicated:
B"best" servers in the tier for the segment.CostBalancerStrategy.)Limit:
loadingMinPercent(relative).After this stage, underreplicated segments are split into the following groups:
4. ToBecomeUnderreplicated segments
A "loading" stage.
In every tier where a segment is underreplicated, schedule loading of the segment on as many servers (among the top
B"best" servers determined for the segment during stage 1) as needed to match the target replication count in the tier.Limit:
loadingMinPercent(relative).After this stage, ToBecomeUnderreplicated segments are split into the following groups:
loadingMinPercentlimit remain in the group where they were at the beginning of this stage.5. Move replicated segments away from decommissioning servers
A "decommissioning" stage.
The following steps are performed in every tier, for every replicated segment loaded on one of the decommissioning servers in the tier:
B"best" servers in the tier for the segment, or reuse already computed topBservers during stage 3 if the segment is underreplicated in the tier.DruidCoordinator.moveSegment().)Limit:
decommissioningMinPercent(relative).After this stage, ToMoveAwayFromDecommServers segments are split into the following groups:
6. First revisit of loading segments
A "loading" stage.
Repeat:
With the difference that the computed top
B"best" servers are reused from the previous stages, and with a different limit (otherwise this stage wouldn't make much sense):Limit: on this stage, loading queue of any server can't contain more than
maxSegmentsInNodeLoadingQueue * (loadingMinPercent + decommissioningMinPercent)segments. This may include segment loading requests hanging since the previous Coordinator's run.7. Segment balancing
A "balancing" stage.
There is a loop performed for every tier in the cluster. Steps of the loop:
B"best" servers in the tier for the segment, or reuse already computed topBservers during stage 3 or 5.B"best" servers for the segment, mark this iteration as "failed" and proceed to the next iteration.Alternative design: we may decide that a segment movement is worthwhile if there is any server where the segment is not loaded (among the top
B) "better" than any server where the segment is currently loaded, rather than only if one of the servers, where the segment is currently loaded, is out of the topB"best" servers. However, avoid making "too precise" balancing movements like moving a segment from the server ranked second to the server ranked first by cost (according toCostBalancerStrategy) leaves more room for segment movements of relatively higher importance, before the loading queues of the servers are sufficiently filled up.5. If the segment can't be moved because the loading queues of all destination servers (those among the top
B, while some servers where the segment is currently loaded are out of the topB) are already sufficiently full (balancingMinPercent(relative)), add the segment to the ToBeBalanced group (for the tier), mark the iteration as "failed", and proceed the the next iteration.6. Mark the iteration as "successful" and schedule a segment movement (like on stage 5, step 2).
The loop exits when among 100 last iterations, at least 80 of them are "failed".
Limit:
balancingMinPercent(relative).Comment: the 80/100 "failed" iterations condition replaces the static bound
maxSegmentsToMove. Its advantage is that this condition is independent of the tier size. It should work for all tiers from containing a single server (then all 100 first iterations will fail and the loop will exit) to hundreds of servers. I don't know if 80 is the good threshold (and if it should be different in tiers of different sizes) because we don't have #5987 yet running in our clusters so we don't have good visibility into real "moved" and "unmoved" counts now. I need some input from the community to determine that.8. Second revisit of loading segments
A "loading" stage.
Repeat stage 6, with the difference that loading queues of all servers can contain up to
maxSegmentsInNodeLoadingQueuesegments on this stage. This limit is also implicitly in place during all previous stages. This type of limit is shortly identified as fullmaxSegmentsInNodeLoadingQueuebelow.9. StillLoadedOnDecommServers segments
A "decommissioning" stage.
Repeat stage 5 for StillLoadedOnDecommServers segments and with a different limit:
decommissioningMaxPercent(relative).10. ToBeBalanced segments
A "balancing" stage.
In every tier, for every ToBeBalanced segment in that tier, schedule as many as possible movements of the segment from the servers where the segment is currently loaded that are out of the top
Bbest servers in the tier for the segment to one of the servers within the topB.Limit: full
maxSegmentsInNodeLoadingQueue.11. Repeat segment balancing
Repeat stage 7 with a different limit: full
maxSegmentsInNodeLoadingQueue. A segment is considered already visited on step 2 if it was visited during stage 7. Also, during this stage, segments don't need to be added to the ToBeBalanced group.12. Unload excessively overreplicated segments
For every segment, if the total replication count in all tiers exceeds the sum of target replication counts in all tiers, schedule unloading of the segment from as many servers in the tiers where the segment is overreplicated as needed to match the total replication count with the sum of target replication counts. If there is more than one server where the segment is loaded in any tier where it is overreplicated and the target replication count in that tier is greater than zero, determining the servers to unload the segment from may require to consult to
CostBalancerStrategy.Note that the replicated segments to which one of DropRules currently applies should be unloaded during this stage.
"Non-excessively overreplicated" segments should be scheduled for unloaded in callbacks during stage 3.
13. Unload broadcasted segments from decommissioning servers
For every decommissioning server that is currently holding only broadcasted segments, schedule unloading all these segments from the server.
Rationale
Compared to the current behavior of Coordinator, the proposed algorithm prioritizes availability as much as possible in the face of abrupt changes in the cluster, for example, when a lot of Historical nodes become unavailable and a lot of segments become unreplicated because of that. During stage 1, only a single replica for every unreplicated segment is scheduled for loading, giving a chance for other unreplicated segments to be scheduled for loading before loading queues of all "good" servers are filled up.
Note that top
B"best" servers are reused between the stages without recomputing, although some loading decisions that happen in between may affect the scores of the servers. This is done not to repeat expensive computations many times inCostBalancerStrategyfor every segment: in the proposed algorithm, a segment may be visited up to five times during different stages in a context of a single tier (an underreplicated segment can be visited during stages 3, 6, 7, 8, and 10). I think this shouldn't affect the quality of cost-based balancing much, however, this opinion is not grounded in anything.Implementation notes
I don't have a firm understanding or confidence in what I write below in this section because I didn't advance much in the implementation yet.
To handle many different groups of segments (up to dozens, depending on the number of tiers in the cluster) I plan to use bitsets. The index corresponds to the position of the segment in the
DruidCoordinatorRuntimeParams.availableSegments(to be renamed intousedSegmentsin #7306). To allow effective indexing, this data structure is turned into a sorted array instead ofTreeSet.set.contains()-like operations are implemented viaArrays.binarySearch(). (This is pretty much how Guava'sImmutableSortedSetworks.)LoadQueuePeonimplementations should internally handle three different types of segment loading requests: "loading", "decommissioning", and "balancing".As I mentioned in #7159,
SegmentReplicantLookupshould become a concurrent data structure.Operational impact
I think the new algorithm should become the default immediately, but I'll keep the old version of the code and provide an option to switch to the old implementation if the new algorithm will fail in production for some unforeseen reasons or bugs.
It doesn't seem reasonable to me to keep the old implementation around for more than one Druid release.
FYI @egor-ryashin @gianm @clintropolis
FYI @jihoonson, see the parts of the algorithm related to BroadcastDistributionRules.