Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/operations/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ Coordinator of the cluster. In addition, returns HTTP 200 if the server is the c
This is suitable for use as a load balancer status check if you only want the active leader to be considered in-service
at the load balancer.


<a name="coordinator-segment-loading"></a>
#### Segment Loading

##### GET
Expand All @@ -102,6 +104,14 @@ Returns the number of segments left to load until segments that should be loaded

Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes segment replication counts.

* `/druid/coordinator/v1/loadstatus?full?computeUsingClusterView`

Returns the number of segments not yet loaded for each tier until all segments loading in the cluster are available.
The result includes segment replication counts. It also factors in the number of available nodes that are of a service type that can load the segment when computing the number of segments remaining to load.
A segment is considered fully loaded when:
- Druid has replicated it the number of times configured in the corresponding load rule.
- Or the number of replicas for the segment in each tier where it is configured to be replicated equals the available nodes of a service type that are currently allowed to load the segment in the tier.

* `/druid/coordinator/v1/loadqueue`

Returns the ids of segments to load and drop for each Historical process.
Expand Down Expand Up @@ -151,6 +161,8 @@ Setting `forceMetadataRefresh` to true will force the coordinator to poll latest
(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms
of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status)
Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh.
You can pass the optional query parameter `computeUsingClusterView` to factor in the available cluster services when calculating
the segments left to load. See [Coordinator Segment Loading](#coordinator-segment-loading) for details.
If no used segments are found for the given inputs, this API returns `204 No Content`

#### Metadata store information
Expand Down Expand Up @@ -887,4 +899,4 @@ Returns the dimensions of the datasource.

* `/druid/v2/datasources/{dataSourceName}/metrics`

Returns the metrics of the datasource.
Returns the metrics of the datasource.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.initialization.jetty.ServiceUnavailableException;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -157,6 +158,7 @@ public class DruidCoordinator

private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
private volatile DruidCluster cluster = null;

private int cachedBalancerThreadNumber;
private ListeningExecutorService balancerExec;
Expand Down Expand Up @@ -280,7 +282,16 @@ public Map<String, LoadQueuePeon> getLoadManagementPeons()
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier()
{
final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments();
return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments);
return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, false);
}

/**
* @return tier -> { dataSource -> underReplicationCount } map
*/
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView()
{
final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments();
return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, true);
}

/**
Expand All @@ -295,37 +306,22 @@ public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataS
Iterable<DataSegment> dataSegments
)
{
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();

if (segmentReplicantLookup == null) {
return underReplicationCountsPerDataSourcePerTier;
}

final DateTime now = DateTimes.nowUtc();

for (final DataSegment segment : dataSegments) {
final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());

for (final Rule rule : rules) {
if (!rule.appliesTo(segment, now)) {
// Rule did not match. Continue to the next Rule.
continue;
}
if (!rule.canLoadSegments()) {
// Rule matched but rule does not and cannot load segments.
// Hence, there is no need to update underReplicationCountsPerDataSourcePerTier map
break;
}

rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, segmentReplicantLookup, segment);

// Only the first matching rule applies. This is because the Coordinator cycle through all used segments
// and match each segment with the first rule that applies. Each segment may only match a single rule.
break;
}
}
return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, false);
}

return underReplicationCountsPerDataSourcePerTier;
/**
* segmentReplicantLookup or cluster use in this method could potentially be stale since it is only updated on coordinator runs.
* However, this is ok as long as the {@param dataSegments} is refreshed/latest as this would at least still ensure
* that the stale data in segmentReplicantLookup and cluster would be under counting replication levels,
* rather than potentially falsely reporting that everything is available.
*
* @return tier -> { dataSource -> underReplicationCount } map
*/
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegmentsUsingClusterView(
Iterable<DataSegment> dataSegments
)
{
return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, true);
}

public Object2IntMap<String> computeNumsUnavailableUsedSegmentsPerDataSource()
Expand Down Expand Up @@ -584,6 +580,58 @@ public void runCompactSegmentsDuty()
compactSegmentsDuty.run();
}

private Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(
Iterable<DataSegment> dataSegments,
boolean computeUsingClusterView
)
{
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();

if (segmentReplicantLookup == null) {
return underReplicationCountsPerDataSourcePerTier;
}

if (computeUsingClusterView && cluster == null) {
throw new ServiceUnavailableException(
"coordinator hasn't populated information about cluster yet, try again later");
}

final DateTime now = DateTimes.nowUtc();

for (final DataSegment segment : dataSegments) {
final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());

for (final Rule rule : rules) {
if (!rule.appliesTo(segment, now)) {
// Rule did not match. Continue to the next Rule.
continue;
}
if (!rule.canLoadSegments()) {
// Rule matched but rule does not and cannot load segments.
// Hence, there is no need to update underReplicationCountsPerDataSourcePerTier map
break;
}

if (computeUsingClusterView) {
rule.updateUnderReplicatedWithClusterView(
underReplicationCountsPerDataSourcePerTier,
segmentReplicantLookup,
cluster,
segment
);
} else {
rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, segmentReplicantLookup, segment);
}

// Only the first matching rule applies. This is because the Coordinator cycle through all used segments
// and match each segment with the first rule that applies. Each segment may only match a single rule.
break;
}
}

return underReplicationCountsPerDataSourcePerTier;
}

private void becomeLeader()
{
synchronized (lock) {
Expand Down Expand Up @@ -852,7 +900,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)

startPeonsForNewServers(currentServers);

final DruidCluster cluster = prepareCluster(params, currentServers);
cluster = prepareCluster(params, currentServers);
segmentReplicantLookup = SegmentReplicantLookup.make(cluster, getDynamicConfigs().getReplicateAfterLoadTimeout());

stopPeonsForDisappearedServers(currentServers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
Expand Down Expand Up @@ -106,6 +107,21 @@ public void updateUnderReplicated(
}
}

@Override
public void updateUnderReplicatedWithClusterView(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DruidCluster cluster,
DataSegment segment
)
{
updateUnderReplicated(
underReplicatedPerTier,
segmentReplicantLookup,
segment
);
}

private CoordinatorStats assign(
final Set<ServerHolder> serverHolders,
final DataSegment segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,34 @@ public void updateUnderReplicated(
});
}

@Override
public void updateUnderReplicatedWithClusterView(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add an implementation for BroadcastDistributionRule as well? I think the implementation there could just call updateUnderReplicated and ignore the cluster parameter, since it's already looking at the available servers in the cluster.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DruidCluster cluster,
DataSegment segment
)
{
getTieredReplicants().forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
Object2LongMap<String> underReplicationPerDataSource = underReplicatedPerTier.computeIfAbsent(
tier,
ignored -> new Object2LongOpenHashMap<>()
);
int possibleReplicants = Math.min(ruleReplicants, cluster.getHistoricals().get(tier).size());
log.debug(
"ruleReplicants: [%d], possibleReplicants: [%d], currentReplicants: [%d]",
ruleReplicants,
possibleReplicants,
currentReplicants
);
((Object2LongOpenHashMap<String>) underReplicationPerDataSource).addTo(
segment.getDataSource(),
Math.max(possibleReplicants - currentReplicants, 0)
);
});
}

/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
Expand Down Expand Up @@ -77,6 +78,22 @@ default void updateUnderReplicated(
Preconditions.checkArgument(!canLoadSegments());
}

/**
* This method should update the {@param underReplicatedPerTier} with the replication count of the
* {@param segment} taking into consideration the number of servers available in cluster that the segment can be
* replicated on. Rule that returns true for {@link Rule#canLoadSegments()} must override this method.
* Note that {@param underReplicatedPerTier} is a map of tier -> { dataSource -> underReplicationCount }
*/
default void updateUnderReplicatedWithClusterView(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DruidCluster cluster,
DataSegment segment
)
{
Preconditions.checkArgument(!canLoadSegments());
}

/**
* {@link DruidCoordinatorRuntimeParams#getUsedSegments()} must not be called in Rule's code, because the used
* segments are not specified for the {@link DruidCoordinatorRuntimeParams} passed into Rule's code. This is because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.server.http.security.StateResourceFilter;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
Expand Down Expand Up @@ -85,15 +86,18 @@ public Response isLeader()
@Produces(MediaType.APPLICATION_JSON)
public Response getLoadStatus(
@QueryParam("simple") String simple,
@QueryParam("full") String full
@QueryParam("full") String full,
@QueryParam("computeUsingClusterView") @Nullable String computeUsingClusterView
)
{
if (simple != null) {
return Response.ok(coordinator.computeNumsUnavailableUsedSegmentsPerDataSource()).build();
}

if (full != null) {
return Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTier()).build();
return computeUsingClusterView != null
? Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView()).build() :
Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTier()).build();
}
return Response.ok(coordinator.getLoadStatus()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ public Response getDatasourceLoadstatus(
@QueryParam("forceMetadataRefresh") final Boolean forceMetadataRefresh,
@QueryParam("interval") @Nullable final String interval,
@QueryParam("simple") @Nullable final String simple,
@QueryParam("full") @Nullable final String full
@QueryParam("full") @Nullable final String full,
@QueryParam("computeUsingClusterView") @Nullable String computeUsingClusterView
)
{
if (forceMetadataRefresh == null) {
Expand Down Expand Up @@ -452,8 +453,10 @@ public Response getDatasourceLoadstatus(
).build();
} else if (full != null) {
// Calculate response for full mode
Map<String, Object2LongMap<String>> segmentLoadMap
= coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get());
Map<String, Object2LongMap<String>> segmentLoadMap =
(computeUsingClusterView != null) ?
coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegmentsUsingClusterView(segments.get()) :
coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get());
if (segmentLoadMap.isEmpty()) {
return Response.serverError()
.entity("Coordinator segment replicant lookup is not initialized yet. Try again later.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ protected void configureServlets()
binder.bind(CustomExceptionMapper.class).in(Singleton.class);
binder.bind(ForbiddenExceptionMapper.class).in(Singleton.class);
binder.bind(BadRequestExceptionMapper.class).in(Singleton.class);
binder.bind(ServiceUnavailableExceptionMapper.class).in(Singleton.class);

serve("/*").with(DruidGuiceContainer.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.initialization.jetty;

/**
* This class is for any exceptions that should return a Service unavailable status code (503).
* See {@code BadQueryException} for query requests.
*
* @see ServiceUnavailableExceptionMapper
*/
public class ServiceUnavailableException extends RuntimeException
{
public ServiceUnavailableException(String msg)
{
super(msg);
}
}
Loading