diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index bcce457fd22e..e0d843913518 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -86,6 +86,8 @@ Coordinator of the cluster. In addition, returns HTTP 200 if the server is the c This is suitable for use as a load balancer status check if you only want the active leader to be considered in-service at the load balancer. + + #### Segment Loading ##### GET @@ -102,6 +104,14 @@ Returns the number of segments left to load until segments that should be loaded Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes segment replication counts. +* `/druid/coordinator/v1/loadstatus?full?computeUsingClusterView` + +Returns the number of segments not yet loaded for each tier until all segments loading in the cluster are available. +The result includes segment replication counts. It also factors in the number of available nodes that are of a service type that can load the segment when computing the number of segments remaining to load. +A segment is considered fully loaded when: +- Druid has replicated it the number of times configured in the corresponding load rule. +- Or the number of replicas for the segment in each tier where it is configured to be replicated equals the available nodes of a service type that are currently allowed to load the segment in the tier. + * `/druid/coordinator/v1/loadqueue` Returns the ids of segments to load and drop for each Historical process. @@ -151,6 +161,8 @@ Setting `forceMetadataRefresh` to true will force the coordinator to poll latest (Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +You can pass the optional query parameter `computeUsingClusterView` to factor in the available cluster services when calculating +the segments left to load. See [Coordinator Segment Loading](#coordinator-segment-loading) for details. If no used segments are found for the given inputs, this API returns `204 No Content` #### Metadata store information @@ -887,4 +899,4 @@ Returns the dimensions of the datasource. * `/druid/v2/datasources/{dataSourceName}/metrics` -Returns the metrics of the datasource. \ No newline at end of file +Returns the metrics of the datasource. diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 58af776863d3..17a3794de5fc 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -76,6 +76,7 @@ import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.initialization.ZkPathsConfig; +import org.apache.druid.server.initialization.jetty.ServiceUnavailableException; import org.apache.druid.server.lookup.cache.LookupCoordinatorManager; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -157,6 +158,7 @@ public class DruidCoordinator private volatile boolean started = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; + private volatile DruidCluster cluster = null; private int cachedBalancerThreadNumber; private ListeningExecutorService balancerExec; @@ -280,7 +282,16 @@ public Map getLoadManagementPeons() public Map> computeUnderReplicationCountsPerDataSourcePerTier() { final Iterable dataSegments = segmentsMetadataManager.iterateAllUsedSegments(); - return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments); + return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, false); + } + + /** + * @return tier -> { dataSource -> underReplicationCount } map + */ + public Map> computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView() + { + final Iterable dataSegments = segmentsMetadataManager.iterateAllUsedSegments(); + return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, true); } /** @@ -295,37 +306,22 @@ public Map> computeUnderReplicationCountsPerDataS Iterable dataSegments ) { - final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); - - if (segmentReplicantLookup == null) { - return underReplicationCountsPerDataSourcePerTier; - } - - final DateTime now = DateTimes.nowUtc(); - - for (final DataSegment segment : dataSegments) { - final List rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource()); - - for (final Rule rule : rules) { - if (!rule.appliesTo(segment, now)) { - // Rule did not match. Continue to the next Rule. - continue; - } - if (!rule.canLoadSegments()) { - // Rule matched but rule does not and cannot load segments. - // Hence, there is no need to update underReplicationCountsPerDataSourcePerTier map - break; - } - - rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, segmentReplicantLookup, segment); - - // Only the first matching rule applies. This is because the Coordinator cycle through all used segments - // and match each segment with the first rule that applies. Each segment may only match a single rule. - break; - } - } + return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, false); + } - return underReplicationCountsPerDataSourcePerTier; + /** + * segmentReplicantLookup or cluster use in this method could potentially be stale since it is only updated on coordinator runs. + * However, this is ok as long as the {@param dataSegments} is refreshed/latest as this would at least still ensure + * that the stale data in segmentReplicantLookup and cluster would be under counting replication levels, + * rather than potentially falsely reporting that everything is available. + * + * @return tier -> { dataSource -> underReplicationCount } map + */ + public Map> computeUnderReplicationCountsPerDataSourcePerTierForSegmentsUsingClusterView( + Iterable dataSegments + ) + { + return computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal(dataSegments, true); } public Object2IntMap computeNumsUnavailableUsedSegmentsPerDataSource() @@ -584,6 +580,58 @@ public void runCompactSegmentsDuty() compactSegmentsDuty.run(); } + private Map> computeUnderReplicationCountsPerDataSourcePerTierForSegmentsInternal( + Iterable dataSegments, + boolean computeUsingClusterView + ) + { + final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); + + if (segmentReplicantLookup == null) { + return underReplicationCountsPerDataSourcePerTier; + } + + if (computeUsingClusterView && cluster == null) { + throw new ServiceUnavailableException( + "coordinator hasn't populated information about cluster yet, try again later"); + } + + final DateTime now = DateTimes.nowUtc(); + + for (final DataSegment segment : dataSegments) { + final List rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource()); + + for (final Rule rule : rules) { + if (!rule.appliesTo(segment, now)) { + // Rule did not match. Continue to the next Rule. + continue; + } + if (!rule.canLoadSegments()) { + // Rule matched but rule does not and cannot load segments. + // Hence, there is no need to update underReplicationCountsPerDataSourcePerTier map + break; + } + + if (computeUsingClusterView) { + rule.updateUnderReplicatedWithClusterView( + underReplicationCountsPerDataSourcePerTier, + segmentReplicantLookup, + cluster, + segment + ); + } else { + rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, segmentReplicantLookup, segment); + } + + // Only the first matching rule applies. This is because the Coordinator cycle through all used segments + // and match each segment with the first rule that applies. Each segment may only match a single rule. + break; + } + } + + return underReplicationCountsPerDataSourcePerTier; + } + private void becomeLeader() { synchronized (lock) { @@ -852,7 +900,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) startPeonsForNewServers(currentServers); - final DruidCluster cluster = prepareCluster(params, currentServers); + cluster = prepareCluster(params, currentServers); segmentReplicantLookup = SegmentReplicantLookup.make(cluster, getDynamicConfigs().getReplicateAfterLoadTimeout()); stopPeonsForDisappearedServers(currentServers); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java index 0b8c37b79318..58b2ecb4bd2c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CoordinatorStats; +import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.SegmentReplicantLookup; @@ -106,6 +107,21 @@ public void updateUnderReplicated( } } + @Override + public void updateUnderReplicatedWithClusterView( + Map> underReplicatedPerTier, + SegmentReplicantLookup segmentReplicantLookup, + DruidCluster cluster, + DataSegment segment + ) + { + updateUnderReplicated( + underReplicatedPerTier, + segmentReplicantLookup, + segment + ); + } + private CoordinatorStats assign( final Set serverHolders, final DataSegment segment diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java index d0bf5b8ad56b..ddf4a8536a07 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java @@ -119,6 +119,34 @@ public void updateUnderReplicated( }); } + @Override + public void updateUnderReplicatedWithClusterView( + Map> underReplicatedPerTier, + SegmentReplicantLookup segmentReplicantLookup, + DruidCluster cluster, + DataSegment segment + ) + { + getTieredReplicants().forEach((final String tier, final Integer ruleReplicants) -> { + int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier); + Object2LongMap underReplicationPerDataSource = underReplicatedPerTier.computeIfAbsent( + tier, + ignored -> new Object2LongOpenHashMap<>() + ); + int possibleReplicants = Math.min(ruleReplicants, cluster.getHistoricals().get(tier).size()); + log.debug( + "ruleReplicants: [%d], possibleReplicants: [%d], currentReplicants: [%d]", + ruleReplicants, + possibleReplicants, + currentReplicants + ); + ((Object2LongOpenHashMap) underReplicationPerDataSource).addTo( + segment.getDataSource(), + Math.max(possibleReplicants - currentReplicants, 0) + ); + }); + } + /** * @param stats {@link CoordinatorStats} to accumulate assignment statistics. */ diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java index 02c552f639cc..1674dac35293 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.druid.server.coordinator.CoordinatorStats; +import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.SegmentReplicantLookup; @@ -77,6 +78,22 @@ default void updateUnderReplicated( Preconditions.checkArgument(!canLoadSegments()); } + /** + * This method should update the {@param underReplicatedPerTier} with the replication count of the + * {@param segment} taking into consideration the number of servers available in cluster that the segment can be + * replicated on. Rule that returns true for {@link Rule#canLoadSegments()} must override this method. + * Note that {@param underReplicatedPerTier} is a map of tier -> { dataSource -> underReplicationCount } + */ + default void updateUnderReplicatedWithClusterView( + Map> underReplicatedPerTier, + SegmentReplicantLookup segmentReplicantLookup, + DruidCluster cluster, + DataSegment segment + ) + { + Preconditions.checkArgument(!canLoadSegments()); + } + /** * {@link DruidCoordinatorRuntimeParams#getUsedSegments()} must not be called in Rule's code, because the used * segments are not specified for the {@link DruidCoordinatorRuntimeParams} passed into Rule's code. This is because diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java index 92543e6a9c75..a12056336ead 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java @@ -30,6 +30,7 @@ import org.apache.druid.server.http.security.StateResourceFilter; import org.apache.druid.timeline.DataSegment; +import javax.annotation.Nullable; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; @@ -85,7 +86,8 @@ public Response isLeader() @Produces(MediaType.APPLICATION_JSON) public Response getLoadStatus( @QueryParam("simple") String simple, - @QueryParam("full") String full + @QueryParam("full") String full, + @QueryParam("computeUsingClusterView") @Nullable String computeUsingClusterView ) { if (simple != null) { @@ -93,7 +95,9 @@ public Response getLoadStatus( } if (full != null) { - return Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTier()).build(); + return computeUsingClusterView != null + ? Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView()).build() : + Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTier()).build(); } return Response.ok(coordinator.getLoadStatus()).build(); } diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 444481d2f72d..0b128a6cc9e4 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -407,7 +407,8 @@ public Response getDatasourceLoadstatus( @QueryParam("forceMetadataRefresh") final Boolean forceMetadataRefresh, @QueryParam("interval") @Nullable final String interval, @QueryParam("simple") @Nullable final String simple, - @QueryParam("full") @Nullable final String full + @QueryParam("full") @Nullable final String full, + @QueryParam("computeUsingClusterView") @Nullable String computeUsingClusterView ) { if (forceMetadataRefresh == null) { @@ -452,8 +453,10 @@ public Response getDatasourceLoadstatus( ).build(); } else if (full != null) { // Calculate response for full mode - Map> segmentLoadMap - = coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get()); + Map> segmentLoadMap = + (computeUsingClusterView != null) ? + coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegmentsUsingClusterView(segments.get()) : + coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get()); if (segmentLoadMap.isEmpty()) { return Response.serverError() .entity("Coordinator segment replicant lookup is not initialized yet. Try again later.") diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java index 6fc105f4ac74..351a4c02e430 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java @@ -116,6 +116,7 @@ protected void configureServlets() binder.bind(CustomExceptionMapper.class).in(Singleton.class); binder.bind(ForbiddenExceptionMapper.class).in(Singleton.class); binder.bind(BadRequestExceptionMapper.class).in(Singleton.class); + binder.bind(ServiceUnavailableExceptionMapper.class).in(Singleton.class); serve("/*").with(DruidGuiceContainer.class); diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/ServiceUnavailableException.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/ServiceUnavailableException.java new file mode 100644 index 000000000000..68b26b3f70eb --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/ServiceUnavailableException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.initialization.jetty; + +/** + * This class is for any exceptions that should return a Service unavailable status code (503). + * See {@code BadQueryException} for query requests. + * + * @see ServiceUnavailableExceptionMapper + */ +public class ServiceUnavailableException extends RuntimeException +{ + public ServiceUnavailableException(String msg) + { + super(msg); + } +} diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/ServiceUnavailableExceptionMapper.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/ServiceUnavailableExceptionMapper.java new file mode 100644 index 000000000000..4e704d8f198e --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/ServiceUnavailableExceptionMapper.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.initialization.jetty; + +import com.google.common.collect.ImmutableMap; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + +@Provider +public class ServiceUnavailableExceptionMapper implements ExceptionMapper +{ + @Override + public Response toResponse(ServiceUnavailableException exception) + { + return Response.status(Response.Status.SERVICE_UNAVAILABLE) + .type(MediaType.APPLICATION_JSON) + .entity(ImmutableMap.of("error", exception.getMessage())) + .build(); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 31bfeeccc77a..36f1bf67f0be 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -412,6 +412,24 @@ public void testCoordinatorRun() throws Exception // The load rules asks for 2 replicas, therefore 1 replica should still be pending Assert.assertEquals(1L, underRepliicationCountsPerDataSource.getLong(dataSource)); + + Map> underReplicationCountsPerDataSourcePerTierUsingClusterView = + coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView(); + Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier); + Assert.assertEquals(1, underReplicationCountsPerDataSourcePerTier.size()); + + Object2LongMap underRepliicationCountsPerDataSourceUsingClusterView = + underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tier); + Assert.assertNotNull(underRepliicationCountsPerDataSourceUsingClusterView); + Assert.assertEquals(1, underRepliicationCountsPerDataSourceUsingClusterView.size()); + //noinspection deprecation + Assert.assertNotNull(underRepliicationCountsPerDataSourceUsingClusterView.get(dataSource)); + // Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event + // The load rules asks for 2 replicas, but only 1 historical server in cluster. Since computing using cluster view + // the segments are replicated as many times as they can be given state of cluster, therefore should not be + // under-replicated. + Assert.assertEquals(0L, underRepliicationCountsPerDataSourceUsingClusterView.getLong(dataSource)); + coordinator.stop(); leaderUnannouncerLatch.await(); @@ -499,6 +517,12 @@ public void testCoordinatorTieredRun() throws Exception Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource)); Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource)); + Map> underReplicationCountsPerDataSourcePerTierUsingClusterView = + coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView(); + Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTierUsingClusterView.size()); + Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(hotTierName).getLong(dataSource)); + Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(coldTierName).getLong(dataSource)); + coordinator.stop(); leaderUnannouncerLatch.await(); @@ -660,6 +684,14 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName1).getLong(dataSource)); Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName2).getLong(dataSource)); + Map> underReplicationCountsPerDataSourcePerTierUsingClusterView = + coordinator.computeUnderReplicationCountsPerDataSourcePerTierUsingClusterView(); + Assert.assertEquals(4, underReplicationCountsPerDataSourcePerTierUsingClusterView.size()); + Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(hotTierName).getLong(dataSource)); + Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(coldTierName).getLong(dataSource)); + Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tierName1).getLong(dataSource)); + Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTierUsingClusterView.get(tierName2).getLong(dataSource)); + coordinator.stop(); leaderUnannouncerLatch.await(); diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 02747a354970..3ca1e0fe6da1 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -1201,7 +1201,7 @@ public void testMarkSegmentsAsUnusedInvalidPayloadBothArguments() public void testGetDatasourceLoadstatusForceMetadataRefreshNull() { DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); - Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null, null); Assert.assertEquals(400, response.getStatus()); } @@ -1223,7 +1223,7 @@ public void testGetDatasourceLoadstatusNoSegmentForInterval() null, null ); - Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null, null); Assert.assertEquals(204, response.getStatus()); } @@ -1281,7 +1281,7 @@ public void testGetDatasourceLoadstatusDefault() EasyMock.replay(segmentsMetadataManager, inventoryView); DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); - Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1297,7 +1297,7 @@ public void testGetDatasourceLoadstatusDefault() EasyMock.replay(segmentsMetadataManager, inventoryView); dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); - response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); + response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1360,7 +1360,7 @@ public void testGetDatasourceLoadstatusSimple() EasyMock.replay(segmentsMetadataManager, inventoryView); DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); - Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1376,7 +1376,7 @@ public void testGetDatasourceLoadstatusSimple() EasyMock.replay(segmentsMetadataManager, inventoryView); dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); - response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null); + response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1431,7 +1431,64 @@ public void testGetDatasourceLoadstatusFull() EasyMock.replay(segmentsMetadataManager, druidCoordinator); DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, druidCoordinator); - Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, "full"); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, "full", null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(2, ((Map) response.getEntity()).size()); + Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier1")).size()); + Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier2")).size()); + Assert.assertEquals(0L, ((Map) ((Map) response.getEntity()).get("tier1")).get("datasource1")); + Assert.assertEquals(3L, ((Map) ((Map) response.getEntity()).get("tier2")).get("datasource1")); + EasyMock.verify(segmentsMetadataManager); + } + + @Test + public void testGetDatasourceLoadstatusFullAndComputeUsingClusterView() + { + DataSegment datasource1Segment1 = new DataSegment( + "datasource1", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 10 + ); + + DataSegment datasource1Segment2 = new DataSegment( + "datasource1", + Intervals.of("2010-01-22/P1D"), + "", + null, + null, + null, + null, + 0x9, + 20 + ); + List segments = ImmutableList.of(datasource1Segment1, datasource1Segment2); + + final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); + Object2LongMap tier1 = new Object2LongOpenHashMap<>(); + tier1.put("datasource1", 0L); + Object2LongMap tier2 = new Object2LongOpenHashMap<>(); + tier2.put("datasource1", 3L); + underReplicationCountsPerDataSourcePerTier.put("tier1", tier1); + underReplicationCountsPerDataSourcePerTier.put("tier2", tier2); + + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + DruidCoordinator druidCoordinator = EasyMock.createMock(DruidCoordinator.class); + EasyMock.expect(druidCoordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegmentsUsingClusterView(segments)) + .andReturn(underReplicationCountsPerDataSourcePerTier).once(); + + EasyMock.replay(segmentsMetadataManager, druidCoordinator); + + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, druidCoordinator); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, "full", "computeUsingClusterView"); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(2, ((Map) response.getEntity()).size());