Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
614a96b
dynamic coord config adding more balancing control
Aug 13, 2020
f7c53cd
fix checkstyle failure
Aug 14, 2020
e1d4765
Make doc more detailed for admin to understand when/why to use new co…
Sep 2, 2020
f525f0a
Merge branch 'master' into bounded-segment-balancer
Sep 2, 2020
0ff7c5a
refactor PR to use a % of segments instead of raw number
Sep 3, 2020
f3993e3
update the docs
Sep 3, 2020
3b04213
remove bad doc line
Sep 3, 2020
d69897e
fix typo in name of new dynamic config
Sep 3, 2020
213599a
update RservoirSegmentSampler to gracefully deal with values > 100%
Sep 3, 2020
8e5a1df
add handler for <= 0 in ReservoirSegmentSampler
Sep 3, 2020
b3f680d
fixup CoordinatorDynamicConfigTest naming and argument ordering
Sep 3, 2020
a8a7c42
fix items in docs after spellcheck flags
Sep 8, 2020
fb20cb2
Fix lgtm flag on missing space in string literal
Sep 8, 2020
fd221c8
Merge branch 'master' into bounded-segment-balancer
Oct 29, 2020
85660be
improve documentation for new config
Oct 29, 2020
f899ade
Add default value to config docs and add advice in cluster tuning doc
Oct 30, 2020
848ac00
Add percentOfSegmentsToConsiderPerMove to web console coord config di…
Oct 30, 2020
982b016
update jest snapshot after console change
Nov 3, 2020
cce72eb
fix spell checker errors
Nov 3, 2020
1c8e9da
Improve debug logging in getRandomSegmentBalancerHolder to cover all …
Nov 3, 2020
48f0a92
Merge branch 'master' into bounded-segment-balancer
Dec 18, 2020
bf59074
add new config back to web console module after merge with master
Dec 18, 2020
dc3f80a
fix ReservoirSegmentSamplerTest
Dec 18, 2020
9ac54ff
fix line breaks in coordinator console dialog
Dec 18, 2020
9197bb0
Add a test that helps ensure not regressions for percentOfSegmentsToC…
Dec 19, 2020
6f24d44
Make improvements based off of feedback in review
Dec 19, 2020
62962e0
additional cleanup coming from review
Dec 19, 2020
93bed10
Add a warning log if limit on segments to consider for move can't be …
Dec 21, 2020
d76c924
remove unused import
Dec 21, 2020
e30ddf3
fix tests for CoordinatorDynamicConfig
Dec 21, 2020
43766ed
remove precondition test that is redundant in CoordinatorDynamicConfi…
Dec 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ A sample Coordinator dynamic config JSON object is shown below:
"mergeBytesLimit": 100000000,
"mergeSegmentsLimit" : 1000,
"maxSegmentsToMove": 5,
"percentOfSegmentsToConsiderPerMove": 100,
"replicantLifetime": 15,
"replicationThrottleLimit": 10,
"emitBalancingStats": false,
Expand All @@ -764,6 +765,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L|
|`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100|
|`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5|
|`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100|
|`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15|
|`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10|
|`balancerComputeThreads`|Thread pool size for computing moving cost of segments in segment balancing. Consider increasing this if you have a lot of segments and moving segments starts to get stuck.|1|
Expand Down
17 changes: 17 additions & 0 deletions docs/operations/basic-cluster-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,23 @@ The heap requirements of the Coordinator scale with the number of servers, segme

You can set the Coordinator heap to the same size as your Broker heap, or slightly smaller: both services have to process cluster-wide state and answer API requests about this state.

#### Dynamic Configuration

`percentOfSegmentsToConsiderPerMove`
* The default value is 100. This means that the Coordinator will consider all segments when it is looking for a segment to move. The Coordinator makes a weighted choice, with segments on Servers with the least capacity being the most likely segments to be moved.
* This weighted selection strategy means that the segments on the servers who have the most available capacity are the least likely to be chosen.
* As the number of segments in the cluster increases, the probability of choosing the Nth segment to move decreases; where N is the last segment considered for moving.
* An admin can use this config to skip consideration of that Nth segment.
* Instead of skipping a precise amount of segments, we skip a percentage of segments in the cluster.
* For example, with the value set to 25, only the first 25% of segments will be considered as a segment that can be moved. This 25% of segments will come from the servers that have the least available capacity.
* In this example, each time the Coordinator looks for a segment to move, it will consider 75% less segments than it did when the configuration was 100. On clusters with hundreds of thousands of segments, this can add up to meaningful coordination time savings.
* General recommendations for this configuration:
* If you are not worried about the amount of time it takes your Coordinator to complete a full coordination cycle, you likely do not need to modify this config.
* If you are frustrated with how long the Coordinator takes to run a full coordination cycle, and you have set the Coordinator dynamic config `maxSegmentsToMove` to a value above 0 (the default is 5), setting this config to a non-default value can help shorten coordination time.
* The recommended starting point value is 66. It represents a meaningful decrease in the percentage of segments considered while also not being too aggressive (You will consider 1/3 fewer segments per move operation with this value).
* The impact that modifying this config will have on your coordination time will be a function of how low you set the config value, the value for `maxSegmentsToMove` and the total number of segments in your cluster.
* If your cluster has a relatively small number of segments, or you choose to move few segments per coordination cycle, there may not be much savings to be had here.

### Overlord

The main performance-related setting on the Overlord is the heap size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,20 @@ public interface BalancerStrategy
* NOTE: this should really be handled on a per-segment basis, to properly support
* the interval or period-based broadcast rules. For simplicity of the initial
* implementation, only forever broadcast rules are supported.
* @param percentOfSegmentsToConsider The percentage of the total number of segments that we will consider when
* choosing which segment to move. {@link CoordinatorDynamicConfig} defines a
* config percentOfSegmentsToConsiderPerMove that will be used as an argument
* for implementations of this method.
*
* @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if
* there are no segments to pick from (i. e. all provided serverHolders are empty).
*/
@Nullable
BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources);
BalancerSegmentHolder pickSegmentToMove(
List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
);

/**
* Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class CoordinatorDynamicConfig
private final long mergeBytesLimit;
private final int mergeSegmentsLimit;
private final int maxSegmentsToMove;
private final double percentOfSegmentsToConsiderPerMove;
private final int replicantLifetime;
private final int replicationThrottleLimit;
private final int balancerComputeThreads;
Expand Down Expand Up @@ -95,6 +96,7 @@ public CoordinatorDynamicConfig(
@JsonProperty("mergeBytesLimit") long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") int maxSegmentsToMove,
@JsonProperty("percentOfSegmentsToConsiderPerMove") double percentOfSegmentsToConsiderPerMove,
@JsonProperty("replicantLifetime") int replicantLifetime,
@JsonProperty("replicationThrottleLimit") int replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") int balancerComputeThreads,
Expand Down Expand Up @@ -123,6 +125,13 @@ public CoordinatorDynamicConfig(
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;

Preconditions.checkArgument(
percentOfSegmentsToConsiderPerMove > 0 && percentOfSegmentsToConsiderPerMove <= 100,
"percentOfSegmentsToConsiderPerMove should be between 1 and 100!"
);
this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove;

this.replicantLifetime = replicantLifetime;
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = Math.max(balancerComputeThreads, 1);
Expand Down Expand Up @@ -211,6 +220,12 @@ public int getMaxSegmentsToMove()
return maxSegmentsToMove;
}

@JsonProperty
public double getPercentOfSegmentsToConsiderPerMove()
{
return percentOfSegmentsToConsiderPerMove;
}

@JsonProperty
public int getReplicantLifetime()
{
Expand Down Expand Up @@ -302,6 +317,7 @@ public String toString()
", mergeBytesLimit=" + mergeBytesLimit +
", mergeSegmentsLimit=" + mergeSegmentsLimit +
", maxSegmentsToMove=" + maxSegmentsToMove +
", percentOfSegmentsToConsiderPerMove=" + percentOfSegmentsToConsiderPerMove +
", replicantLifetime=" + replicantLifetime +
", replicationThrottleLimit=" + replicationThrottleLimit +
", balancerComputeThreads=" + balancerComputeThreads +
Expand Down Expand Up @@ -341,6 +357,9 @@ public boolean equals(Object o)
if (maxSegmentsToMove != that.maxSegmentsToMove) {
return false;
}
if (percentOfSegmentsToConsiderPerMove != that.percentOfSegmentsToConsiderPerMove) {
return false;
}
if (replicantLifetime != that.replicantLifetime) {
return false;
}
Expand Down Expand Up @@ -382,6 +401,7 @@ public int hashCode()
mergeBytesLimit,
mergeSegmentsLimit,
maxSegmentsToMove,
percentOfSegmentsToConsiderPerMove,
replicantLifetime,
replicationThrottleLimit,
balancerComputeThreads,
Expand All @@ -408,6 +428,7 @@ public static class Builder
private static final long DEFAULT_MERGE_BYTES_LIMIT = 524_288_000L;
private static final int DEFAULT_MERGE_SEGMENTS_LIMIT = 100;
private static final int DEFAULT_MAX_SEGMENTS_TO_MOVE = 5;
private static final int DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE = 100;
private static final int DEFAULT_REPLICANT_LIFETIME = 15;
private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10;
private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1;
Expand All @@ -421,6 +442,7 @@ public static class Builder
private Long mergeBytesLimit;
private Integer mergeSegmentsLimit;
private Integer maxSegmentsToMove;
private Double percentOfSegmentsToConsiderPerMove;
private Integer replicantLifetime;
private Integer replicationThrottleLimit;
private Boolean emitBalancingStats;
Expand All @@ -444,6 +466,7 @@ public Builder(
@JsonProperty("mergeBytesLimit") @Nullable Long mergeBytesLimit,
@JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit,
@JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove,
@JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove,
@JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime,
@JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit,
@JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads,
Expand All @@ -463,6 +486,7 @@ public Builder(
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.maxSegmentsToMove = maxSegmentsToMove;
this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have a PreCondition check that it falls between 0 - 100 ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, Since this is a Builder class that builds a CoordinatorDynamicConfig object, I think this precondition is covered by the actual constructor for the CoordinatorDynamicConfig class. IMO, having another precondition here is redundant

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me

this.replicantLifetime = replicantLifetime;
this.replicationThrottleLimit = replicationThrottleLimit;
this.balancerComputeThreads = balancerComputeThreads;
Expand Down Expand Up @@ -500,6 +524,12 @@ public Builder withMaxSegmentsToMove(int maxSegmentsToMove)
return this;
}

public Builder withPercentOfSegmentsToConsiderPerMove(double percentOfSegmentsToConsiderPerMove)
{
this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove;
return this;
}

public Builder withReplicantLifetime(int replicantLifetime)
{
this.replicantLifetime = replicantLifetime;
Expand Down Expand Up @@ -569,6 +599,8 @@ public CoordinatorDynamicConfig build()
mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : mergeBytesLimit,
mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : mergeSegmentsLimit,
maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : maxSegmentsToMove,
percentOfSegmentsToConsiderPerMove == null ? DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
: percentOfSegmentsToConsiderPerMove,
replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : replicantLifetime,
replicationThrottleLimit == null ? DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads,
Expand Down Expand Up @@ -598,6 +630,7 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit,
mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit,
maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove,
percentOfSegmentsToConsiderPerMove == null ? defaults.getPercentOfSegmentsToConsiderPerMove() : percentOfSegmentsToConsiderPerMove,
replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime,
replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
balancerComputeThreads == null ? defaults.getBalancerComputeThreads() : balancerComputeThreads,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,15 @@ static double computeJointSegmentsCost(final DataSegment segment, final Iterable
@Override
public BalancerSegmentHolder pickSegmentToMove(
final List<ServerHolder> serverHolders,
Set<String> broadcastDatasources
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
)
{
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources);
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(
serverHolders,
broadcastDatasources,
percentOfSegmentsToConsider
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,17 @@ public ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List
}

@Override
public BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders, Set<String> broadcastDatasources)
public BalancerSegmentHolder pickSegmentToMove(
List<ServerHolder> serverHolders,
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
)
{
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(serverHolders, broadcastDatasources);
return ReservoirSegmentSampler.getRandomBalancerSegmentHolder(
serverHolders,
broadcastDatasources,
percentOfSegmentsToConsider
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.server.coordinator;

import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.timeline.DataSegment;

import java.util.List;
Expand All @@ -28,15 +29,54 @@
final class ReservoirSegmentSampler
{

private static final EmittingLogger log = new EmittingLogger(ReservoirSegmentSampler.class);

/**
* Iterates over segments that live on the candidate servers passed in {@link ServerHolder} and (possibly) picks a
* segment to return to caller in a {@link BalancerSegmentHolder} object.
*
* @param serverHolders List of {@link ServerHolder} objects containing segments who are candidates to be chosen.
* @param broadcastDatasources Set of DataSource names that identify broadcast datasources. We don't want to consider
* segments from these datasources.
* @param percentOfSegmentsToConsider The % of total cluster segments to consider before short-circuiting and
* returning immediately.
* @return
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the doc 👍

static BalancerSegmentHolder getRandomBalancerSegmentHolder(
final List<ServerHolder> serverHolders,
Set<String> broadcastDatasources
Set<String> broadcastDatasources,
double percentOfSegmentsToConsider
)
{
ServerHolder fromServerHolder = null;
DataSegment proposalSegment = null;
int calculatedSegmentLimit = Integer.MAX_VALUE;
int numSoFar = 0;

// Reset a bad value of percentOfSegmentsToConsider to 100. We don't allow consideration less than or equal to
// 0% of segments or greater than 100% of segments.
if (percentOfSegmentsToConsider <= 0 || percentOfSegmentsToConsider > 100) {
log.warn("Resetting percentOfSegmentsToConsider to 100 because only values from 1 to 100 are allowed."
+ " You Provided [%f]", percentOfSegmentsToConsider);
percentOfSegmentsToConsider = 100;
}

// Calculate the integer limit for the number of segments to be considered for moving if % is less than 100
if (percentOfSegmentsToConsider < 100) {
int totalSegments = 0;
for (ServerHolder server : serverHolders) {
totalSegments += server.getServer().getNumSegments();
}
// If totalSegments are zero, we will assume it is a mistake and move on to iteration without updating
// calculatedSegmentLimit
if (totalSegments != 0) {
calculatedSegmentLimit = (int) Math.ceil((double) totalSegments * (percentOfSegmentsToConsider / 100.0));
} else {
log.warn("Unable to calculate limit on segments to consider because ServerHolder collection indicates"
+ " zero segments existing in the cluster.");
}
}

for (ServerHolder server : serverHolders) {
if (!server.getServer().getType().isSegmentReplicationTarget()) {
// if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do
Expand All @@ -56,6 +96,19 @@ static BalancerSegmentHolder getRandomBalancerSegmentHolder(
proposalSegment = segment;
}
numSoFar++;

// We have iterated over the alloted number of segments and will return the currently proposed segment or null
// We will only break out early if we are iterating less than 100% of the total cluster segments
if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) {
log.debug("Breaking out of iteration over potential segments to move because we hit the limit [%f percent] of"
+ " segments to consider to move. Segments Iterated: [%d]", percentOfSegmentsToConsider, numSoFar);
break;
}
}
// We have iterated over the alloted number of segments and will return the currently proposed segment or null
// We will only break out early if we are iterating less than 100% of the total cluster segments
if (percentOfSegmentsToConsider < 100 && numSoFar >= calculatedSegmentLimit) {
break;
}
}
if (fromServerHolder != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ private Pair<Integer, Integer> balanceServers(
for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(
toMoveFrom,
params.getBroadcastDatasources()
params.getBroadcastDatasources(),
params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove()
);
if (segmentToMoveHolder == null) {
log.info("All servers to move segments from are empty, ending run.");
Expand Down
Loading