Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none|
|`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false|
|`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 100. |100|
|`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
|`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70|
|`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ public class CoordinatorDynamicConfig
private final int balancerComputeThreads;
private final boolean emitBalancingStats;

/** If true {@link KillUnusedSegments} sends kill tasks for unused segments in all data sources. */
/**
* If true {@link KillUnusedSegments} sends kill tasks for unused segments in all data sources.
*/
private final boolean killUnusedSegmentsInAllDataSources;

/**
Expand All @@ -74,7 +76,7 @@ public class CoordinatorDynamicConfig
/**
* Stale pending segments belonging to the data sources in this list are not killed by {@link
* KillStalePendingSegments}. In other words, segments in these data sources are "protected".
*
* <p>
* Pending segments are considered "stale" when their created_time is older than {@link
* KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now.
*/
Expand Down Expand Up @@ -134,7 +136,7 @@ public CoordinatorDynamicConfig(
// Keeping the legacy 'killPendingSegmentsSkipList' property name for backward compatibility. When the project is
// updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152
@JsonProperty("killPendingSegmentsSkipList") Object dataSourcesToNotKillStalePendingSegmentsIn,
@JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue,
@JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue,
@JsonProperty("decommissioningNodes") Object decommissioningNodes,
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") boolean pauseCoordination,
Expand All @@ -149,11 +151,12 @@ public CoordinatorDynamicConfig(
this.maxSegmentsToMove = maxSegmentsToMove;

if (percentOfSegmentsToConsiderPerMove == null) {
log.debug("percentOfSegmentsToConsiderPerMove was null! This is likely because your metastore does not "
+ "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value "
+ "to the Druid default of %f. It is recommended that you re-submit your dynamic config with your "
+ "desired value for percentOfSegmentsToConsideredPerMove",
Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
log.debug(
"percentOfSegmentsToConsiderPerMove was null! This is likely because your metastore does not "
+ "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value "
+ "to the Druid default of %f. It is recommended that you re-submit your dynamic config with your "
+ "desired value for percentOfSegmentsToConsideredPerMove",
Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
);
percentOfSegmentsToConsiderPerMove = Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE;
}
Expand All @@ -172,7 +175,9 @@ public CoordinatorDynamicConfig(
this.specificDataSourcesToKillUnusedSegmentsIn = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn);
this.dataSourcesToNotKillStalePendingSegmentsIn =
parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn);
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue == null
? Builder.DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
: maxSegmentsInNodeLoadingQueue;
this.decommissioningNodes = parseJsonStringOrArray(decommissioningNodes);
Preconditions.checkArgument(
decommissioningMaxPercentOfMaxSegmentsToMove >= 0 && decommissioningMaxPercentOfMaxSegmentsToMove <= 100,
Expand Down Expand Up @@ -517,7 +522,7 @@ public static class Builder
private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false;
private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false;
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100;
private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
private static final boolean DEFAULT_PAUSE_COORDINATION = false;
private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;
Expand Down Expand Up @@ -732,7 +737,8 @@ public CoordinatorDynamicConfig build()
: decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination,
replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD : maxNonPrimaryReplicantsToLoad
maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
: maxNonPrimaryReplicantsToLoad
);
}

Expand All @@ -745,7 +751,9 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit,
mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit,
maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove,
percentOfSegmentsToConsiderPerMove == null ? defaults.getPercentOfSegmentsToConsiderPerMove() : percentOfSegmentsToConsiderPerMove,
percentOfSegmentsToConsiderPerMove == null
? defaults.getPercentOfSegmentsToConsiderPerMove()
: percentOfSegmentsToConsiderPerMove,
useBatchedSegmentSampler == null ? defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime,
replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
Expand All @@ -769,7 +777,9 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
: decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination,
replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
maxNonPrimaryReplicantsToLoad == null ? defaults.getMaxNonPrimaryReplicantsToLoad() : maxNonPrimaryReplicantsToLoad
maxNonPrimaryReplicantsToLoad == null
? defaults.getMaxNonPrimaryReplicantsToLoad()
: maxNonPrimaryReplicantsToLoad
);
}
}
Expand Down
Loading