Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,8 @@ A sample Coordinator dynamic config JSON object is shown below:
"decommissioningMaxPercentOfMaxSegmentsToMove": 70,
"pauseCoordination": false,
"replicateAfterLoadTimeout": false,
"maxNonPrimaryReplicantsToLoad": 2147483647
"maxNonPrimaryReplicantsToLoad": 2147483647,
"maxSegmentsToLoadPerCoordinationCycle": 2147483647
}
```

Expand All @@ -962,7 +963,8 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`decommissioningMaxPercentOfMaxSegmentsToMove`| Upper limit of segments the Coordinator can move from decommissioning servers to active non-decommissioning servers during a single run. This value is relative to the total maximum number of segments that can be moved at any given time based upon the value of `maxSegmentsToMove`.<br /><br />If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, the Coordinator does not move segments to decommissioning servers, effectively putting them in a type of "maintenance" mode. In this case, decommissioning servers do not participate in balancing or assignment by load rules. The Coordinator still considers segments on decommissioning servers as candidates to replicate on active servers.<br /><br />Decommissioning can stall if there are no available active servers to move the segments to. You can use the maximum percent of decommissioning segment movements to prioritize balancing or to decrease commissioning time to prevent active servers from being overloaded. The value must be between 0 and 100.|70|
|`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false|
|`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false|
|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time.|`Integer.MAX_VALUE`|
|`maxNonPrimaryReplicantsToLoad`|This is the maximum number of non-primary segment replicants to load per Coordination run. This number can be set to put a hard upper limit on the number of non-primary replicants loaded. It is a tool that can help prevent long delays in new data being available for query after events that require many non-primary replicants to be loaded by the cluster; such as a Historical node disconnecting from the cluster. The default value essentially means there is no limit on the number of replicants loaded per coordination cycle. If you want to use a non-default value for this config, you may want to start with it being `~20%` of the number of segments found on your Historical server with the most segments. You can use the Druid metric, `coordinator/time` with the filter `duty=org.apache.druid.server.coordinator.duty.RunRules` to see how different values of this config impact your Coordinator execution time. This configuration has no effect if the value is greater than the value of `maxSegmentsToLoadPerCoordinationCycle`|`Integer.MAX_VALUE`|
|`maxSegmentsToLoadPerCoordinationCycle`|This it the maximum number of segments - both primary and non-primary replicants - that can be loaded per Coordination run. The default is equivalent to there being no limit. This differs from `maxNonPrimaryReplicantsToLoad` because it includes the count of primary replicants that are loaded in the limit. An operator may want to use this configuration to prevent the coordinator from spinning and loading many segments that are already loaded, but appeared to be unavailable due to a temporary network issue causing some number of Historical servers to have their segments go missing.|`Integer.MAX_VALUE`|


To view the audit history of Coordinator dynamic config issue a GET request to the URL -
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public class CoordinatorDynamicConfig
*/
private final int maxNonPrimaryReplicantsToLoad;

/**
* This is the upper limit on segments to load per coordination cycle. Once this limit is hit, the coordinator will
* move on to it's next task.
*/
private final int maxSegmentsToLoadPerCoordinationCycle;

private static final Logger log = new Logger(CoordinatorDynamicConfig.class);

@JsonCreator
Expand Down Expand Up @@ -137,7 +143,8 @@ public CoordinatorDynamicConfig(
@JsonProperty("pauseCoordination") boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout,
@JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad,
@JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment
@JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment,
@JsonProperty("maxSegmentsToLoadPerCoordinationCycle") @Nullable Integer maxSegmentsToLoadPerCoordinationCycle
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
Expand Down Expand Up @@ -209,6 +216,22 @@ public CoordinatorDynamicConfig(
} else {
this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
}

if (maxSegmentsToLoadPerCoordinationCycle == null) {
log.debug(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The debug choice seems a bit odd. On the one hand, it would seem reasonable to have defaults for values which are not provided. Ideally, if the user provides no value, then any existing value remains unchanged (though such a change is probably out of scope of this PR.)

The idea is, as we add dynamic configs, the user should not have to first download all the existing settings, change the one of interest, and upload all of them. Just upload the one that needs to change and let Druid do the merge.

If we were to support the "values are optional" approach, then the user would need no warning when using it: doing so would be expected.

On the other hand, if we do require that the user specify all settings, including those added in the most recent release, then we should encourage people to update their dynamic config scripts with the new parameter, deciding on the default value they want. In that case, this error should be more than DEBUG since debug is often turned off. Maybe WARN?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the log is useless. Not sure why I continue to include it.. I had a former PR where I introduced a new configuration and initially had the log as a WARN to alert the operator. However, in consultation with the reviewer we decided that replacing a missing value with the default is "normal behavior" and thus shouldn't be logged as a warning. I don't recall exactly what my thoughts were in flipping to debug instead of just deleting.

IMO Druid should gracefully handle newly introduced configs on upgrade by having slipping in the new default. If the operator cares to use a non-default value for any new configs, they can POST their desired config spec to the API directly or use the Druid console to make the update as intended. Otherwise, Druid should just handle everything quietly on deserialization.

I created #11161 a long time ago when we identified that this check for null and set default as being clunky. I'm not sure why we don't leverage the DynamicCoordinatorConfig#Builder to handle deserialization today. I could be missing something here that influenced the current serde, but nevertheless, I am prepping a separate PR to use the Builder instead of the actual CoordinatorDynamicConfig class for deserialization. If that gets accepted, then this whole block of code could be tossed in the dumpster

"maxSegmentsToLoadPerCoordinationCycle was null! This is likely because your metastore does not "
+ "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value "
+ "to the Druid default of %d. It is recommended that you re-submit your dynamic config with your "
+ "desired value for maxNonPrimaryReplicantsToLoad",
Builder.DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE
);
maxSegmentsToLoadPerCoordinationCycle = Builder.DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE;
}
Preconditions.checkArgument(
maxSegmentsToLoadPerCoordinationCycle >= 0,
"maxPrimaryReplicantsToLoad must be greater than or equal to 0."
);
this.maxSegmentsToLoadPerCoordinationCycle = maxSegmentsToLoadPerCoordinationCycle;
}

private static Set<String> parseJsonStringOrArray(Object jsonStringOrArray)
Expand Down Expand Up @@ -390,6 +413,13 @@ public int getMaxNonPrimaryReplicantsToLoad()
return maxNonPrimaryReplicantsToLoad;
}

@Min(0)
@JsonProperty
public int getMaxSegmentsToLoadPerCoordinationCycle()
{
return maxSegmentsToLoadPerCoordinationCycle;
}

@Override
public String toString()
{
Expand All @@ -413,6 +443,7 @@ public String toString()
", pauseCoordination=" + pauseCoordination +
", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout +
", maxNonPrimaryReplicantsToLoad=" + maxNonPrimaryReplicantsToLoad +
", maxSegmentsToLoadPerCoordinationCycle=" + maxSegmentsToLoadPerCoordinationCycle +
'}';
}

Expand Down Expand Up @@ -480,6 +511,9 @@ public boolean equals(Object o)
if (maxNonPrimaryReplicantsToLoad != that.maxNonPrimaryReplicantsToLoad) {
return false;
}
if (maxSegmentsToLoadPerCoordinationCycle != that.maxSegmentsToLoadPerCoordinationCycle) {
return false;
}
return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove;
}

Expand All @@ -503,7 +537,8 @@ public int hashCode()
decommissioningNodes,
decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination,
maxNonPrimaryReplicantsToLoad
maxNonPrimaryReplicantsToLoad,
maxSegmentsToLoadPerCoordinationCycle
);
}

Expand Down Expand Up @@ -531,6 +566,7 @@ public static class Builder
private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;
private static final int DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE;
private static final boolean DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT = false;
private static final int DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE = Integer.MAX_VALUE;

private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
private Long mergeBytesLimit;
Expand All @@ -551,6 +587,7 @@ public static class Builder
private Boolean replicateAfterLoadTimeout;
private Integer maxNonPrimaryReplicantsToLoad;
private Boolean useRoundRobinSegmentAssignment;
private Integer maxSegmentsToLoadPerCoordinationCycle;

public Builder()
{
Expand Down Expand Up @@ -578,7 +615,8 @@ public Builder(
@JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout,
@JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad,
@JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment
@JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment,
@JsonProperty("maxSegmentsToLoadPerCoordinationCycle") @Nullable Integer maxSegmentsToLoadPerCoordinationCycle
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
Expand All @@ -601,6 +639,7 @@ public Builder(
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad;
this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
this.maxSegmentsToLoadPerCoordinationCycle = maxSegmentsToLoadPerCoordinationCycle;
}

public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis)
Expand Down Expand Up @@ -713,6 +752,12 @@ public Builder withUseRoundRobinSegmentAssignment(boolean useRoundRobinSegmentAs
return this;
}

public Builder withMaxSegmentsToLoadPerCoordinationCycle(int maxSegmentsToLoadPerCoordinationCycle)
{
this.maxSegmentsToLoadPerCoordinationCycle = maxSegmentsToLoadPerCoordinationCycle;
return this;
}

public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
Expand Down Expand Up @@ -742,7 +787,9 @@ public CoordinatorDynamicConfig build()
replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
: maxNonPrimaryReplicantsToLoad,
useRoundRobinSegmentAssignment == null ? DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
useRoundRobinSegmentAssignment == null ? DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment,
maxSegmentsToLoadPerCoordinationCycle == null ? DEFAULT_MAX_SEGMENTS_TO_LOAD_PER_COORDINATION_CYCLE
: maxSegmentsToLoadPerCoordinationCycle
);
}

Expand Down Expand Up @@ -781,7 +828,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
maxNonPrimaryReplicantsToLoad == null
? defaults.getMaxNonPrimaryReplicantsToLoad()
: maxNonPrimaryReplicantsToLoad,
useRoundRobinSegmentAssignment == null ? defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
useRoundRobinSegmentAssignment == null ? defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment,
maxSegmentsToLoadPerCoordinationCycle == null ? defaults.getMaxSegmentsToLoadPerCoordinationCycle() : maxSegmentsToLoadPerCoordinationCycle
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ReplicationThrottler;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -122,6 +123,12 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
}

for (DataSegment segment : params.getUsedSegments()) {
if (stats.getGlobalStat(LoadRule.AGGREGATE_ASSIGNED_COUNT) >= params.getCoordinatorDynamicConfig().getMaxSegmentsToLoadPerCoordinationCycle()) {
log.info("Maximum number of segments [%d] have been loaded for the current RunRules execution.",
params.getCoordinatorDynamicConfig().getMaxSegmentsToLoadPerCoordinationCycle());
break;
}

if (overshadowed.contains(segment)) {
// Skipping overshadowed segments
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@
public abstract class LoadRule implements Rule
{
private static final EmittingLogger log = new EmittingLogger(LoadRule.class);
static final String ASSIGNED_COUNT = "assignedCount";
public static final String AGGREGATE_ASSIGNED_COUNT = "aggregateAssignedCount";
public static final String ASSIGNED_COUNT = "assignedCount";
static final String DROPPED_COUNT = "droppedCount";
public final String NON_PRIMARY_ASSIGNED_COUNT = "totalNonPrimaryReplicantsLoaded";
public static final String REQUIRED_CAPACITY = "requiredCapacity";
Expand Down Expand Up @@ -183,6 +184,7 @@ private void assign(

// numAssigned - 1 because we don't want to count the primary assignment
stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned - 1);
stats.addToGlobalStat(AGGREGATE_ASSIGNED_COUNT, numAssigned);

stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned);

Expand Down Expand Up @@ -329,6 +331,7 @@ private void assignReplicas(
createLoadQueueSizeLimitingPredicate(segment),
segment
);
stats.addToGlobalStat(AGGREGATE_ASSIGNED_COUNT, numAssigned);
stats.addToGlobalStat(NON_PRIMARY_ASSIGNED_COUNT, numAssigned);
stats.addToTieredStat(ASSIGNED_COUNT, tier, numAssigned);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,55 @@ public void testOneNodesOneTierOneReplicantCostBalancerStrategyNotEnoughSpace()
EasyMock.verify(mockPeon);
}

@Test
public void testRunRulesMaxSegmentsToLoadLimit()
{
mockCoordinator();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
mockEmptyPeon();

EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
Collections.singletonList(
new IntervalLoadRule(
Intervals.of("2012-01-01T00:00:00.000Z/2012-01-02T00:00:00.000Z"),
ImmutableMap.of("normal", 2)
)
)).atLeastOnce();
EasyMock.replay(databaseRuleManager);

DruidCluster druidCluster = DruidClusterBuilder
.newBuilder()
.addTier(
"normal",
new ServerHolder(
new DruidServer("serverNorm", "hostNorm", null, 1000, ServerType.HISTORICAL, "normal", 0)
.toImmutableDruidServer(),
mockPeon
),
new ServerHolder(
new DruidServer("serverNorm2", "hostNorm2", null, 1000, ServerType.HISTORICAL, "normal", 0)
.toImmutableDruidServer(),
mockPeon
)
)
.build();

ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
BalancerStrategy balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);

DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy)
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToLoadPerCoordinationCycle(20).build())
.build();

DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
CoordinatorStats stats = afterParams.getCoordinatorStats();
Assert.assertEquals(20L, stats.getGlobalStat(LoadRule.AGGREGATE_ASSIGNED_COUNT));

exec.shutdown();
EasyMock.verify(mockPeon);
}

private void mockCoordinator()
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();
Expand Down
Loading