Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,8 @@ A sample Coordinator dynamic config JSON object is shown below:
"killDataSourceWhitelist": ["wikipedia", "testDatasource"],
"decommissioningNodes": ["localhost:8182", "localhost:8282"],
"decommissioningMaxPercentOfMaxSegmentsToMove": 70,
"pauseCoordination": false
"pauseCoordination": false,
"replicateAfterLoadTimeout": false
}
```

Expand All @@ -799,6 +800,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
|`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70|
|`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false|
|`replicateAfterLoadTimeout`| Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow historicals in the cluster. However, the slow historical may still load the segment later and the coordinator may issue drop requests if the segment is over-replicated.|false|


To view the audit history of Coordinator dynamic config issue a GET request to the URL -
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ public class CoordinatorDynamicConfig
private final int maxSegmentsInNodeLoadingQueue;
private final boolean pauseCoordination;

/**
* This decides whether additional replication is needed for segments that have failed to load due to a load timeout.
* When enabled, the coordinator will attempt to replicate the failed segment on a different historical server.
* The historical which failed to load the segment may still load the segment later. Therefore, enabling this setting
* works better if there are a few slow historicals in the cluster and segment availability needs to be sped up.
*/
private final boolean replicateAfterLoadTimeout;

private static final Logger log = new Logger(CoordinatorDynamicConfig.class);

@JsonCreator
Expand Down Expand Up @@ -120,7 +128,8 @@ public CoordinatorDynamicConfig(
@JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue,
@JsonProperty("decommissioningNodes") Object decommissioningNodes,
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") boolean pauseCoordination
@JsonProperty("pauseCoordination") boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
Expand Down Expand Up @@ -166,6 +175,7 @@ public CoordinatorDynamicConfig(
);
}
this.pauseCoordination = pauseCoordination;
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
}

private static Set<String> parseJsonStringOrArray(Object jsonStringOrArray)
Expand Down Expand Up @@ -320,6 +330,12 @@ public boolean getPauseCoordination()
return pauseCoordination;
}

@JsonProperty
public boolean getReplicateAfterLoadTimeout()
{
return replicateAfterLoadTimeout;
}

@Override
public String toString()
{
Expand All @@ -341,6 +357,7 @@ public String toString()
", decommissioningNodes=" + decommissioningNodes +
", decommissioningMaxPercentOfMaxSegmentsToMove=" + decommissioningMaxPercentOfMaxSegmentsToMove +
", pauseCoordination=" + pauseCoordination +
", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout +
'}';
}

Expand Down Expand Up @@ -402,6 +419,9 @@ public boolean equals(Object o)
if (pauseCoordination != that.pauseCoordination) {
return false;
}
if (replicateAfterLoadTimeout != that.replicateAfterLoadTimeout) {
return false;
}
return decommissioningMaxPercentOfMaxSegmentsToMove == that.decommissioningMaxPercentOfMaxSegmentsToMove;
}

Expand Down Expand Up @@ -449,6 +469,7 @@ public static class Builder
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
private static final boolean DEFAULT_PAUSE_COORDINATION = false;
private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;

private Long leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments;
private Long mergeBytesLimit;
Expand All @@ -466,6 +487,7 @@ public static class Builder
private Object decommissioningNodes;
private Integer decommissioningMaxPercentOfMaxSegmentsToMove;
private Boolean pauseCoordination;
private Boolean replicateAfterLoadTimeout;

public Builder()
{
Expand All @@ -490,7 +512,8 @@ public Builder(
@JsonProperty("decommissioningNodes") @Nullable Object decommissioningNodes,
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove")
@Nullable Integer decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination
@JsonProperty("pauseCoordination") @Nullable Boolean pauseCoordination,
@JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout
)
{
this.leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments =
Expand All @@ -510,6 +533,7 @@ public Builder(
this.decommissioningNodes = decommissioningNodes;
this.decommissioningMaxPercentOfMaxSegmentsToMove = decommissioningMaxPercentOfMaxSegmentsToMove;
this.pauseCoordination = pauseCoordination;
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
}

public Builder withLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments(long leadingTimeMillis)
Expand Down Expand Up @@ -602,6 +626,12 @@ public Builder withPauseCoordination(boolean pauseCoordination)
return this;
}

public Builder withReplicateAfterLoadTimeout(boolean replicateAfterLoadTimeout)
{
this.replicateAfterLoadTimeout = replicateAfterLoadTimeout;
return this;
}

public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
Expand Down Expand Up @@ -629,7 +659,8 @@ public CoordinatorDynamicConfig build()
decommissioningMaxPercentOfMaxSegmentsToMove == null
? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
: decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination
pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination,
replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout
);
}

Expand Down Expand Up @@ -663,7 +694,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
decommissioningMaxPercentOfMaxSegmentsToMove == null
? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
: decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination
pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination,
replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);

/**
* Needs to be thread safe since it can be concurrently accessed via
* {@link #failAssign(SegmentHolder, boolean, Exception)}, {@link #actionCompleted(SegmentHolder)},
* {@link #getTimedOutSegments()} and {@link #stop()}
*/
private final ConcurrentSkipListSet<DataSegment> timedOutSegments = new ConcurrentSkipListSet<>(
DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
);

CuratorLoadQueuePeon(
CuratorFramework curator,
String basePath,
Expand Down Expand Up @@ -149,6 +158,12 @@ public Set<DataSegment> getSegmentsMarkedToDrop()
return segmentsMarkedToDrop;
}

@Override
public Set<DataSegment> getTimedOutSegments()
{
return timedOutSegments;
}

@Override
public long getLoadQueueSize()
{
Expand Down Expand Up @@ -268,10 +283,10 @@ public void run()
// This is expected when historicals haven't yet picked up processing this segment and coordinator
// tries reassigning it to the same node.
log.warn(ne, "ZK node already exists because segment change request hasn't yet been processed");
failAssign(segmentHolder);
failAssign(segmentHolder, true);
}
catch (Exception e) {
failAssign(segmentHolder, e);
failAssign(segmentHolder, false, e);
}
}

Expand All @@ -282,14 +297,21 @@ private ScheduledFuture<?> scheduleNodeDeletedCheck(String path)
() -> {
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(segmentHolder, new ISE("%s was never removed! Failing this operation!", path));
failAssign(
segmentHolder,
true,
new ISE("Failing this %s operation since it timed out and %s was never removed! These segments might still get processed",
segmentHolder.getType() == DROP ? "DROP" : "LOAD",
path
)
);
} else {
log.debug("%s detected to be removed. ", path);
}
}
catch (Exception e) {
log.error(e, "Exception caught and ignored when checking whether zk node was deleted");
failAssign(segmentHolder, e);
failAssign(segmentHolder, false, e);
}
},
config.getLoadTimeoutDelay().getMillis(),
Expand All @@ -307,10 +329,12 @@ private void actionCompleted(SegmentHolder segmentHolder)
// See https://github.com/apache/druid/pull/10362 for more details.
if (null != segmentsToLoad.remove(segmentHolder.getSegment())) {
queuedSize.addAndGet(-segmentHolder.getSegmentSize());
timedOutSegments.remove(segmentHolder.getSegment());
}
break;
case DROP:
segmentsToDrop.remove(segmentHolder.getSegment());
timedOutSegments.remove(segmentHolder.getSegment());
break;
default:
throw new UnsupportedOperationException();
Expand All @@ -337,6 +361,7 @@ public void stop()
}
segmentsToLoad.clear();

timedOutSegments.clear();
queuedSize.set(0L);
failedAssignCount.set(0);
}
Expand All @@ -361,21 +386,33 @@ private void entryRemoved(SegmentHolder segmentHolder, String path)
);
}

private void failAssign(SegmentHolder segmentHolder)
private void failAssign(SegmentHolder segmentHolder, boolean handleTimeout)
{
failAssign(segmentHolder, null);
failAssign(segmentHolder, handleTimeout, null);
}

private void failAssign(SegmentHolder segmentHolder, Exception e)
private void failAssign(SegmentHolder segmentHolder, boolean handleTimeout, Exception e)
{
if (e != null) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, segmentHolder);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we don't emit exceptions currently (using EmittingLogger.makeAlert()), but should we? At least for the segment loading timeout error, it would be nice to emit those errors so that cluster operators can notice there is something going wrong with segment loading.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alerting sounds like a good idea, but my concern is that since the alert would happen per segment, a slowness on the historical side can generate a large number of alerts for a fairly large cluster. What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also as a followup PR I was planning to add the timedOut segment list to the /druid/coordinator/v1/loadqueue along with some docs about its usage in understanding the cluster behavior.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alerting sounds like a good idea, but my concern is that since the alert would happen per segment, a slowness on the historical side can generate a large number of alerts for a fairly large cluster. What do you think?

I think it's a valid concern. We may be able to emit those exceptions in bulk if they are thrown in a short time frame. I believe this should be done in a separate PR even if we want, and thus my comment is not a blocker for this PR.

Also as a followup PR I was planning to add the timedOut segment list to the /druid/coordinator/v1/loadqueue along with some docs about its usage in understanding the cluster behavior.

Thanks. It sounds good to me.

}
failedAssignCount.getAndIncrement();
// Act like it was completed so that the coordinator gives it to someone else
actionCompleted(segmentHolder);
}

if (handleTimeout) {
// Avoid removing the segment entry from the load/drop list in case config.getLoadTimeoutDelay() expires.
// This is because the ZK Node is still present and it may be processed after this timeout and so the coordinator
// needs to take this into account.
log.debug(
"Skipping segment removal from [%s] queue, since ZK Node still exists!",
segmentHolder.getType() == DROP ? "DROP" : "LOAD"
);
timedOutSegments.add(segmentHolder.getSegment());
executeCallbacks(segmentHolder);
} else {
// This may have failed for a different reason and so act like it was completed.
actionCompleted(segmentHolder);
}
}

private static class SegmentHolder
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
startPeonsForNewServers(currentServers);

final DruidCluster cluster = prepareCluster(params, currentServers);
segmentReplicantLookup = SegmentReplicantLookup.make(cluster);
segmentReplicantLookup = SegmentReplicantLookup.make(cluster, getDynamicConfigs().getReplicateAfterLoadTimeout());

stopPeonsForDisappearedServers(currentServers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,12 @@ public Set<DataSegment> getSegmentsToDrop()
return Collections.unmodifiableSet(segmentsToDrop.keySet());
}

@Override
public Set<DataSegment> getTimedOutSegments()
{
return Collections.emptySet();
}

@Override
public long getLoadQueueSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public abstract class LoadQueuePeon

public abstract Set<DataSegment> getSegmentsToDrop();

public abstract Set<DataSegment> getTimedOutSegments();

public abstract void unmarkSegmentToDrop(DataSegment segmentToLoad);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@
*/
public class SegmentReplicantLookup
{
public static SegmentReplicantLookup make(DruidCluster cluster)
public static SegmentReplicantLookup make(DruidCluster cluster, boolean replicateAfterLoadTimeout)
{
final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();

/**
* For each tier, this stores the number of replicants for all the segments presently queued to load in {@link cluster}.
* Segments that have failed to load due to the load timeout may not be present in this table if {@link replicateAfterLoadTimeout} is true.
* This is to enable additional replication of the timed out segments for improved availability.
*/
final Table<SegmentId, String, Integer> loadingSegments = HashBasedTable.create();

for (SortedSet<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
Expand All @@ -59,7 +65,11 @@ public static SegmentReplicantLookup make(DruidCluster cluster)
if (numReplicants == null) {
numReplicants = 0;
}
loadingSegments.put(segment.getId(), server.getTier(), numReplicants + 1);
// Timed out segments need to be replicated in another server for faster availability.
// Therefore we skip incrementing numReplicants for timed out segments if replicateAfterLoadTimeout is enabled.
if (!replicateAfterLoadTimeout || !serverHolder.getPeon().getTimedOutSegments().contains(segment)) {
loadingSegments.put(segment.getId(), server.getTier(), numReplicants + 1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loadingSegments is not just a set of segments loading anymore. Please add some javadoc in SegmentReplicantLookup about this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @himanshug pointed out in #10193 (comment), there could be two types of slow segment loading.

  • There are a few historicals being slow in segment loading in the cluster. This can be caused by unbalanced load queues or some intermittent failures.
  • Historicals are OK, but ingestion might outpace the ability to load segments.

This particular change in SegmentReplicantLookup could help in the former case, but make things worse in the latter case. In an extreme case, all historicals could have the same set of timed-out segments in their load queue. This might be still OK though, because, if that's the case, Druid cannot get out of that state by itself anyway. The system administrator should add more historicals or use more threads for parallel segment loading. However, we should provide relevant data so that system administrators can tell what's happening. I left another comment about emitting exceptions to provide such data.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson @himanshug Would it make sense to make the replication behavior user configurable? We could have a dynamic config like replicateAfterLoadTimeout which would control whether the segments would be attempted to be replicated to a different historical in case of a load timeout to the current historical. The default could be true but a cluster operator can set this to false if they wish to avoid the additional churn and know the historicals are OK and it would eventually load the segments.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a config seems reasonable to me 👍

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds good to me too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a config. I've set replicateAfterLoadTimeout to false as the default I feel it might be better to preserve the existing behaviour and admins need to be aware of this property's behavior before setting it to true. Let me know what you think.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds good to me to preserve the existing behavior by default.

}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static DruidCoordinatorRuntimeParams.Builder newBuilder(DruidCluster drui
{
return newBuilder()
.withDruidCluster(druidCluster)
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster));
.withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false));
}

private CoordinatorRuntimeParamsTestHelpers()
Expand Down
Loading