Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions docs/api-reference/dynamic-configuration-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ Host: http://ROUTER_IP:ROUTER_PORT
"useRoundRobinSegmentAssignment": true,
"smartSegmentLoading": true,
"debugDimensions": null,
"turboLoadingNodes": []
"turboLoadingNodes": [],
"cloneServers": {}

}
```

Expand Down Expand Up @@ -174,7 +176,8 @@ curl "http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config" \
"replicateAfterLoadTimeout": false,
"maxNonPrimaryReplicantsToLoad": 2147483647,
"useRoundRobinSegmentAssignment": true,
"turboLoadingNodes": []
"turboLoadingNodes": [],
"cloneServers": {}
}'
```

Expand Down Expand Up @@ -206,7 +209,8 @@ Content-Length: 683
"replicateAfterLoadTimeout": false,
"maxNonPrimaryReplicantsToLoad": 2147483647,
"useRoundRobinSegmentAssignment": true,
"turboLoadingNodes": []
"turboLoadingNodes": [],
"cloneServers": {}
}
```

Expand Down
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,7 @@ The following table shows the dynamic configuration properties for the Coordinat
|`pauseCoordination`|Boolean flag for whether or not the Coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` interface. Such duties include: segment balancing, segment compaction, submitting kill tasks for unused segments (if enabled), logging of used segments in the cluster, marking of newly unused or overshadowed segments, matching and execution of load/drop rules for used segments, unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS name nodes with downtime and don't want the Coordinator to be directing Historical nodes to hit the name node with API requests until maintenance is done and the deep store is declared healthy for use again.|false|
|`replicateAfterLoadTimeout`|Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow Historicals in the cluster. However, the slow Historical may still load the segment later and the Coordinator may issue drop requests if the segment is over-replicated.|false|
|`turboLoadingNodes`| Experimental. List of Historical servers to place in turbo loading mode. These servers use a larger thread-pool to load segments faster but at the cost of query performance. For servers specified in `turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator uses the value of the respective `numLoadingThreads` instead.<br/>Please use this config with caution. All servers should eventually be removed from this list once the segment loading on the respective historicals is finished. |none|
|`cloneServers`| Experimental. Map from target Historical server to source Historical server which should be cloned by the target. The target Historical does not participate in regular segment assignment or balancing. Instead, the Coordinator mirrors any segment assignment made to the source Historical onto the target Historical, so that the target becomes an exact copy of the source. Segments on the target Historical do not count towards replica counts either. If the source disappears, the target remains in the last known state of the source server until removed from the configuration. <br/>Use this config with caution. All servers should eventually be removed from this list once the desired state on the respective Historicals is achieved. |none|

##### Smart segment loading

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class CoordinatorDynamicConfig
private final Map<Dimension, String> validDebugDimensions;

private final Set<String> turboLoadingNodes;
private final Map<String, String> cloneServers;

/**
* Stale pending segments belonging to the data sources in this list are not killed by {@code
Expand Down Expand Up @@ -124,7 +125,8 @@ public CoordinatorDynamicConfig(
@JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment,
@JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading,
@JsonProperty("debugDimensions") @Nullable Map<String, String> debugDimensions,
@JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes
@JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes,
@JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers
)
{
this.markSegmentAsUnusedDelayMillis =
Expand Down Expand Up @@ -169,6 +171,7 @@ public CoordinatorDynamicConfig(
this.debugDimensions = debugDimensions;
this.validDebugDimensions = validateDebugDimensions(debugDimensions);
this.turboLoadingNodes = Configs.valueOrDefault(turboLoadingNodes, Set.of());
this.cloneServers = Configs.valueOrDefault(cloneServers, Map.of());
}

private Map<Dimension, String> validateDebugDimensions(Map<String, String> debugDimensions)
Expand Down Expand Up @@ -322,6 +325,19 @@ public boolean getReplicateAfterLoadTimeout()
return replicateAfterLoadTimeout;
}

/**
* Map from target Historical server to source Historical server which should be cloned by the target. The target
* Historical does not participate in regular segment assignment or balancing. Instead, the Coordinator mirrors any
* segment assignment made to the source Historical onto the target Historical, so that the target becomes an exact
* copy of the source. Segments on the target Historical do not count towards replica counts either. If the source
* disappears, the target remains in the last known state of the source server until removed from the cloneServers.
*/
@JsonProperty
Comment thread
adarshsanjeev marked this conversation as resolved.
public Map<String, String> getCloneServers()
Comment thread
adarshsanjeev marked this conversation as resolved.
{
return cloneServers;
}

/**
* List of servers to put in turbo-loading mode. These servers will use a larger thread pool to load
* segments. This causes decreases the average time taken to load segments. However, this also means less resources
Expand Down Expand Up @@ -464,6 +480,7 @@ public static class Builder
private Boolean useRoundRobinSegmentAssignment;
private Boolean smartSegmentLoading;
private Set<String> turboLoadingNodes;
private Map<String, String> cloneServers;

public Builder()
{
Expand All @@ -487,7 +504,8 @@ public Builder(
@JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment,
@JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading,
@JsonProperty("debugDimensions") @Nullable Map<String, String> debugDimensions,
@JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes
@JsonProperty("turboLoadingNodes") @Nullable Set<String> turboLoadingNodes,
@JsonProperty("cloneServers") @Nullable Map<String, String> cloneServers
)
{
this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis;
Expand All @@ -507,6 +525,7 @@ public Builder(
this.smartSegmentLoading = smartSegmentLoading;
this.debugDimensions = debugDimensions;
this.turboLoadingNodes = turboLoadingNodes;
this.cloneServers = cloneServers;
}

public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis)
Expand Down Expand Up @@ -599,6 +618,12 @@ public Builder withUseRoundRobinSegmentAssignment(boolean useRoundRobinSegmentAs
return this;
}

public Builder withCloneServers(Map<String, String> cloneServers)
{
this.cloneServers = cloneServers;
return this;
}

/**
* Builds a CoordinatoryDynamicConfig using either the configured values, or
* the default value if not configured.
Expand All @@ -625,7 +650,8 @@ public CoordinatorDynamicConfig build()
valueOrDefault(useRoundRobinSegmentAssignment, Defaults.USE_ROUND_ROBIN_ASSIGNMENT),
valueOrDefault(smartSegmentLoading, Defaults.SMART_SEGMENT_LOADING),
debugDimensions,
turboLoadingNodes
turboLoadingNodes,
cloneServers
);
}

Expand Down Expand Up @@ -656,7 +682,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
valueOrDefault(useRoundRobinSegmentAssignment, defaults.isUseRoundRobinSegmentAssignment()),
valueOrDefault(smartSegmentLoading, defaults.isSmartSegmentLoading()),
valueOrDefault(debugDimensions, defaults.getDebugDimensions()),
valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes())
valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()),
valueOrDefault(cloneServers, defaults.getCloneServers())
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Contains a representation of the current state of the cluster by tier.
Expand All @@ -44,8 +45,9 @@ public class DruidCluster

private final Set<ServerHolder> realtimes;
private final Map<String, NavigableSet<ServerHolder>> historicals;
private final Map<String, NavigableSet<ServerHolder>> managedHistoricals;
private final Set<ServerHolder> brokers;
private final List<ServerHolder> allServers;
private final List<ServerHolder> allManagedServers;

private DruidCluster(
Set<ServerHolder> realtimes,
Expand All @@ -58,20 +60,42 @@ private DruidCluster(
historicals,
holders -> CollectionUtils.newTreeSet(Comparator.naturalOrder(), holders)
);
this.managedHistoricals = CollectionUtils.mapValues(
historicals,
holders -> {
List<ServerHolder> managedServers = holders.stream()
.filter(serverHolder -> !serverHolder.isUnmanaged())
.collect(Collectors.toList());

return CollectionUtils.newTreeSet(Comparator.naturalOrder(), managedServers);
}
);
this.brokers = Collections.unmodifiableSet(brokers);
this.allServers = initAllServers();
this.allManagedServers = initAllManagedServers();
Comment thread
kfaraz marked this conversation as resolved.
}

public Set<ServerHolder> getRealtimes()
{
return realtimes;
}

/**
* Return all historicals.
*/
public Map<String, NavigableSet<ServerHolder>> getHistoricals()
{
return historicals;
}

/**
* Returns all managed historicals. Managed historicals are historicals which can participate in segment assignment,
* drop or balancing.
*/
public Map<String, NavigableSet<ServerHolder>> getManagedHistoricals()
Comment thread
adarshsanjeev marked this conversation as resolved.
Comment thread
adarshsanjeev marked this conversation as resolved.
{
return managedHistoricals;
}

public Set<ServerHolder> getBrokers()
{
return brokers;
Expand All @@ -82,26 +106,26 @@ public Iterable<String> getTierNames()
return historicals.keySet();
}

public NavigableSet<ServerHolder> getHistoricalsByTier(String tier)
public NavigableSet<ServerHolder> getManagedHistoricalsByTier(String tier)
{
return historicals.get(tier);
return managedHistoricals.get(tier);
}

public List<ServerHolder> getAllServers()
public List<ServerHolder> getAllManagedServers()
Comment thread
kfaraz marked this conversation as resolved.
{
return allServers;
return allManagedServers;
}

private List<ServerHolder> initAllServers()
private List<ServerHolder> initAllManagedServers()
{
final int historicalSize = historicals.values().stream().mapToInt(Collection::size).sum();
final int historicalSize = managedHistoricals.values().stream().mapToInt(Collection::size).sum();
final int realtimeSize = realtimes.size();
final List<ServerHolder> allServers = new ArrayList<>(historicalSize + realtimeSize);
final List<ServerHolder> allManagedServers = new ArrayList<>(historicalSize + realtimeSize);

historicals.values().forEach(allServers::addAll);
allServers.addAll(brokers);
allServers.addAll(realtimes);
return allServers;
managedHistoricals.values().forEach(allManagedServers::addAll);
allManagedServers.addAll(brokers);
allManagedServers.addAll(realtimes);
return allManagedServers;
}

public boolean isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.config.KillUnusedSegmentsConfig;
import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.duty.CloneHistoricals;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
Expand Down Expand Up @@ -558,6 +559,7 @@ private List<CoordinatorDuty> makeHistoricalManagementDuties()
new MarkOvershadowedSegmentsAsUnused(deleteSegments),
new MarkEternityTombstonesAsUnused(deleteSegments),
new BalanceSegments(config.getCoordinatorPeriod()),
new CloneHistoricals(loadQueueManager),
new CollectLoadQueueStats()
);
}
Expand Down
Loading
Loading