Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class CoordinatorDynamicConfig
private final int replicationThrottleLimit;
private final int balancerComputeThreads;
private final boolean emitBalancingStats;
private final boolean useRoundRobinSegmentAssignment;

/**
* List of specific data sources for which kill tasks are sent in {@link KillUnusedSegments}.
Expand Down Expand Up @@ -134,7 +135,8 @@ public CoordinatorDynamicConfig(
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout,
@JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad
@JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad,
@JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
Expand Down Expand Up @@ -195,6 +197,12 @@ public CoordinatorDynamicConfig(
"maxNonPrimaryReplicantsToLoad must be greater than or equal to 0."
);
this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad;

if (useRoundRobinSegmentAssignment == null) {
this.useRoundRobinSegmentAssignment = Builder.DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT;
} else {
this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
}
}

private static Set<String> parseJsonStringOrArray(Object jsonStringOrArray)
Expand Down Expand Up @@ -316,6 +324,12 @@ public int getMaxSegmentsInNodeLoadingQueue()
return maxSegmentsInNodeLoadingQueue;
}

@JsonProperty
public boolean isUseRoundRobinSegmentAssignment()
{
return useRoundRobinSegmentAssignment;
}

/**
* List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning'
* servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate
Expand Down Expand Up @@ -509,6 +523,7 @@ public static class Builder
private static final boolean DEFAULT_PAUSE_COORDINATION = false;
private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;
private static final int DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD = Integer.MAX_VALUE;
private static final boolean DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT = false;

private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
private Long mergeBytesLimit;
Expand All @@ -528,6 +543,7 @@ public static class Builder
private Boolean pauseCoordination;
private Boolean replicateAfterLoadTimeout;
private Integer maxNonPrimaryReplicantsToLoad;
private Boolean useRoundRobinSegmentAssignment;

public Builder()
{
Expand All @@ -554,7 +570,8 @@ public Builder(
@Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout,
@JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad
@JsonProperty("maxNonPrimaryReplicantsToLoad") @Nullable Integer maxNonPrimaryReplicantsToLoad,
@JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
Expand All @@ -576,6 +593,7 @@ public Builder(
this.pauseCoordination = pauseCoordination;
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
this.maxNonPrimaryReplicantsToLoad = maxNonPrimaryReplicantsToLoad;
this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
}

public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis)
Expand Down Expand Up @@ -681,6 +699,12 @@ public Builder withMaxNonPrimaryReplicantsToLoad(int maxNonPrimaryReplicantsToLo
return this;
}

public Builder withUseRoundRobinSegmentAssignment(boolean useRoundRobinSegmentAssignment)
{
this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
return this;
}

public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
Expand Down Expand Up @@ -709,7 +733,8 @@ public CoordinatorDynamicConfig build()
pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination,
replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
: maxNonPrimaryReplicantsToLoad
: maxNonPrimaryReplicantsToLoad,
useRoundRobinSegmentAssignment == null ? DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
);
}

Expand Down Expand Up @@ -747,7 +772,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
maxNonPrimaryReplicantsToLoad == null
? defaults.getMaxNonPrimaryReplicantsToLoad()
: maxNonPrimaryReplicantsToLoad
: maxNonPrimaryReplicantsToLoad,
useRoundRobinSegmentAssignment == null ? defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,10 +993,19 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)

stopPeonsForDisappearedServers(currentServers);

final RoundRobinServerSelector roundRobinServerSelector;
if (params.getCoordinatorDynamicConfig().isUseRoundRobinSegmentAssignment()) {
roundRobinServerSelector = new RoundRobinServerSelector(cluster);
log.info("Using round-robin segment assignment.");
} else {
roundRobinServerSelector = null;
}

return params.buildFromExisting()
.withDruidCluster(cluster)
.withLoadManagementPeons(loadManagementPeons)
.withSegmentReplicantLookup(segmentReplicantLookup)
.withRoundRobinServerSelector(roundRobinServerSelector)
.build();
}

Expand Down Expand Up @@ -1044,7 +1053,8 @@ DruidCluster prepareCluster(DruidCoordinatorRuntimeParams params, List<Immutable
new ServerHolder(
server,
loadManagementPeons.get(server.getName()),
decommissioningServers.contains(server.getHost())
decommissioningServers.contains(server.getHost()),
params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ private static TreeSet<DataSegment> createUsedSegmentsSet(Iterable<DataSegment>
private final CoordinatorStats stats;
private final BalancerStrategy balancerStrategy;
private final Set<String> broadcastDatasources;
private final @Nullable RoundRobinServerSelector roundRobinServerSelector;

private DruidCoordinatorRuntimeParams(
long startTimeNanos,
Expand All @@ -80,6 +81,7 @@ private DruidCoordinatorRuntimeParams(
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager,
RoundRobinServerSelector roundRobinServerSelector,
ServiceEmitter emitter,
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
Expand All @@ -96,6 +98,7 @@ private DruidCoordinatorRuntimeParams(
this.dataSourcesSnapshot = dataSourcesSnapshot;
this.loadManagementPeons = loadManagementPeons;
this.replicationManager = replicationManager;
this.roundRobinServerSelector = roundRobinServerSelector;
this.emitter = emitter;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
Expand Down Expand Up @@ -150,6 +153,12 @@ public ReplicationThrottler getReplicationManager()
return replicationManager;
}

@Nullable
public RoundRobinServerSelector getRoundRobinServerSelector()
{
return roundRobinServerSelector;
}

public ServiceEmitter getEmitter()
{
return emitter;
Expand Down Expand Up @@ -211,6 +220,7 @@ public Builder buildFromExisting()
dataSourcesSnapshot,
loadManagementPeons,
replicationManager,
roundRobinServerSelector,
emitter,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
Expand All @@ -231,6 +241,7 @@ public Builder buildFromExistingWithoutSegmentsMetadata()
null, // dataSourcesSnapshot
loadManagementPeons,
replicationManager,
roundRobinServerSelector,
emitter,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
Expand All @@ -250,6 +261,7 @@ public static class Builder
private @Nullable DataSourcesSnapshot dataSourcesSnapshot;
private final Map<String, LoadQueuePeon> loadManagementPeons;
private ReplicationThrottler replicationManager;
private @Nullable RoundRobinServerSelector roundRobinServerSelector;
private ServiceEmitter emitter;
private CoordinatorDynamicConfig coordinatorDynamicConfig;
private CoordinatorCompactionConfig coordinatorCompactionConfig;
Expand All @@ -267,6 +279,7 @@ private Builder()
this.dataSourcesSnapshot = null;
this.loadManagementPeons = new HashMap<>();
this.replicationManager = null;
this.roundRobinServerSelector = null;
this.emitter = null;
this.stats = new CoordinatorStats();
this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build();
Expand All @@ -283,6 +296,7 @@ private Builder()
@Nullable DataSourcesSnapshot dataSourcesSnapshot,
Map<String, LoadQueuePeon> loadManagementPeons,
ReplicationThrottler replicationManager,
RoundRobinServerSelector roundRobinServerSelector,
ServiceEmitter emitter,
CoordinatorDynamicConfig coordinatorDynamicConfig,
CoordinatorCompactionConfig coordinatorCompactionConfig,
Expand All @@ -299,6 +313,7 @@ private Builder()
this.dataSourcesSnapshot = dataSourcesSnapshot;
this.loadManagementPeons = loadManagementPeons;
this.replicationManager = replicationManager;
this.roundRobinServerSelector = roundRobinServerSelector;
this.emitter = emitter;
this.coordinatorDynamicConfig = coordinatorDynamicConfig;
this.coordinatorCompactionConfig = coordinatorCompactionConfig;
Expand All @@ -319,6 +334,7 @@ public DruidCoordinatorRuntimeParams build()
dataSourcesSnapshot,
loadManagementPeons,
replicationManager,
roundRobinServerSelector,
emitter,
coordinatorDynamicConfig,
coordinatorCompactionConfig,
Expand Down Expand Up @@ -401,6 +417,12 @@ public Builder withReplicationManager(ReplicationThrottler replicationManager)
return this;
}

public Builder withRoundRobinServerSelector(RoundRobinServerSelector roundRobinServerSelector)
{
this.roundRobinServerSelector = roundRobinServerSelector;
return this;
}

public Builder withEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
Expand Down
Loading