diff --git a/docs/ingestion/faq.md b/docs/ingestion/faq.md index 1e6ffe10254f..308407fa3426 100644 --- a/docs/ingestion/faq.md +++ b/docs/ingestion/faq.md @@ -66,6 +66,20 @@ Other common reasons that hand-off fails are as follows: Make sure to include the `druid-hdfs-storage` and all the hadoop configuration, dependencies (that can be obtained by running command `hadoop classpath` on a machine where hadoop has been setup) in the classpath. And, provide necessary HDFS settings as described in [deep storage](../dependencies/deep-storage.md) . +## How do I know when I can make query to Druid after submitting batch ingestion task? + +You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following workflow. +1. Submit your ingestion task. +2. Repeatedly poll the [Overlord's tasks API](../operations/api-reference.md#tasks) ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. +3. Poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with +`forceMetadataRefresh=true` and `interval=` once. +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms of the load on the metadata store but is necessary to make sure that we verify all the latest segments' load status) +If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. +4. Repeatedly poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with +`forceMetadataRefresh=false` and `interval=`. +Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. +Note that this workflow only guarantees that the segments are available at the time of the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) call. Segments can still become missing because of historical process failures or any other reasons afterward. + ## I don't see my Druid segments on my Historical processes You can check the Coordinator console located at `:`. Make sure that your segments have actually loaded on [Historical processes](../design/historical.md). If your segments are not present, check the Coordinator logs for messages about capacity of replication errors. One reason that segments are not downloaded is because Historical processes have maxSizes that are too small, making them incapable of downloading more data. You can change that with (for example): diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index a66dd649f410..a3610a8bc52a 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -96,11 +96,11 @@ Returns the percentage of segments actually loaded in the cluster versus segment * `/druid/coordinator/v1/loadstatus?simple` -Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replication. +Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include segment replication counts. * `/druid/coordinator/v1/loadstatus?full` -Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes replication. +Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes segment replication counts. * `/druid/coordinator/v1/loadqueue` @@ -114,6 +114,45 @@ Returns the number of segments to load and drop, as well as the total segment lo Returns the serialized JSON of segments to load and drop for each Historical process. + +#### Segment Loading by Datasource + +Note that all _interval_ query parameters are ISO 8601 strings (e.g., 2016-06-27/2016-06-28). +Also note that these APIs only guarantees that the segments are available at the time of the call. +Segments can still become missing because of historical process failures or any other reasons afterward. + +##### GET + +* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}` + +Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given +datasource over the given interval (or last 2 weeks if interval is not given). `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +If no used segments are found for the given inputs, this API returns `204 No Content` + + * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}` + +Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource +over the given interval (or last 2 weeks if interval is not given). This does not include segment replication counts. `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +If no used segments are found for the given inputs, this API returns `204 No Content` + +* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}` + +Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource +over the given interval (or last 2 weeks if interval is not given). This includes segment replication counts. `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +If no used segments are found for the given inputs, this API returns `204 No Content` + #### Metadata store information ##### GET diff --git a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java index 2517a8f0e9be..538cc2f526f3 100644 --- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java +++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java @@ -200,6 +200,10 @@ public VersionedIntervalTimeline getTimeline(DataSource } } + public Map getSegmentLoadInfos() + { + return segmentLoadInfos; + } @Override public DruidServer getInventoryValue(String serverKey) diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 4f97b158021e..889141a89c15 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -20,6 +20,7 @@ package org.apache.druid.metadata; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.timeline.DataSegment; @@ -113,6 +114,22 @@ int markAsUsedNonOvershadowedSegments(String dataSource, Set segmentIds) */ Iterable iterateAllUsedSegments(); + /** + * Returns an iterable to go over all used and non-overshadowed segments of given data sources over given interval. + * The order in which segments are iterated is unspecified. Note: the iteration may not be as trivially cheap as, + * for example, iteration over an ArrayList. Try (to some reasonable extent) to organize the code so that it + * iterates the returned iterable only once rather than several times. + * If {@param requiresLatest} is true then a force metadatastore poll will be triggered. This can cause a longer + * response time but will ensure that the latest segment information (at the time this method is called) is returned. + * If {@param requiresLatest} is false then segment information from stale snapshot of up to the last periodic poll + * period {@link SqlSegmentsMetadataManager#periodicPollDelay} will be used. + */ + Optional> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + String datasource, + Interval interval, + boolean requiresLatest + ); + /** * Retrieves all data source names for which there are segment in the database, regardless of whether those segments * are used or not. If there are no segments in the database, returns an empty set. diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 92a874863527..60f7f4b86eeb 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -20,6 +20,8 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -44,6 +46,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -95,7 +98,8 @@ private interface DatabasePoll {} /** Represents periodic {@link #poll}s happening from {@link #exec}. */ - private static class PeriodicDatabasePoll implements DatabasePoll + @VisibleForTesting + static class PeriodicDatabasePoll implements DatabasePoll { /** * This future allows to wait until {@link #dataSourcesSnapshot} is initialized in the first {@link #poll()} @@ -104,13 +108,15 @@ private static class PeriodicDatabasePoll implements DatabasePoll * leadership changes. */ final CompletableFuture firstPollCompletionFuture = new CompletableFuture<>(); + long lastPollStartTimestampInMs = -1; } /** * Represents on-demand {@link #poll} initiated at periods of time when SqlSegmentsMetadataManager doesn't poll the database * periodically. */ - private static class OnDemandDatabasePoll implements DatabasePoll + @VisibleForTesting + static class OnDemandDatabasePoll implements DatabasePoll { final long initiationTimeNanos = System.nanoTime(); final CompletableFuture pollCompletionFuture = new CompletableFuture<>(); @@ -127,7 +133,7 @@ long nanosElapsedFromInitiation() * called at the same time if two different threads are calling them. This might be possible if Coordinator gets and * drops leadership repeatedly in quick succession. * - * This lock is also used to synchronize {@link #awaitOrPerformDatabasePoll} for times when SqlSegmentsMetadataManager + * This lock is also used to synchronize {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll} for times when SqlSegmentsMetadataManager * is not polling the database periodically (in other words, when the Coordinator is not the leader). */ private final ReentrantReadWriteLock startStopPollLock = new ReentrantReadWriteLock(); @@ -155,7 +161,7 @@ long nanosElapsedFromInitiation() * easy to forget to do. * * This field may be updated from {@link #exec}, or from whatever thread calling {@link #doOnDemandPoll} via {@link - * #awaitOrPerformDatabasePoll()} via one of the public methods of SqlSegmentsMetadataManager. + * #useLatestIfWithinDelayOrPerformNewDatabasePoll()} via one of the public methods of SqlSegmentsMetadataManager. */ private volatile @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = null; @@ -170,7 +176,7 @@ long nanosElapsedFromInitiation() * Note that if there is a happens-before relationship between a call to {@link #startPollingDatabasePeriodically()} * (on Coordinators' leadership change) and one of the methods accessing the {@link #dataSourcesSnapshot}'s state in * this class the latter is guaranteed to await for the initiated periodic poll. This is because when the latter - * method calls to {@link #awaitLatestDatabasePoll()} via {@link #awaitOrPerformDatabasePoll}, they will + * method calls to {@link #useLatestSnapshotIfWithinDelay()} via {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll}, they will * see the latest {@link PeriodicDatabasePoll} value (stored in this field, latestDatabasePoll, in {@link * #startPollingDatabasePeriodically()}) and to await on its {@link PeriodicDatabasePoll#firstPollCompletionFuture}. * @@ -185,7 +191,7 @@ long nanosElapsedFromInitiation() * SegmentsMetadataManager} and guarantee that it always returns consistent and relatively up-to-date data from methods * like {@link #getImmutableDataSourceWithUsedSegments}, while avoiding excessive repetitive polls. The last part * is achieved via "hooking on" other polls by awaiting on {@link PeriodicDatabasePoll#firstPollCompletionFuture} or - * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #awaitOrPerformDatabasePoll} method + * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll} method * implementation for details. * * Note: the overall implementation of periodic/on-demand polls is not completely optimal: for example, when the @@ -194,7 +200,7 @@ long nanosElapsedFromInitiation() * during Coordinator leadership switches is not a priority. * * This field is {@code volatile} because it's checked and updated in a double-checked locking manner in {@link - * #awaitOrPerformDatabasePoll()}. + * #useLatestIfWithinDelayOrPerformNewDatabasePoll()}. */ private volatile @Nullable DatabasePoll latestDatabasePoll = null; @@ -311,6 +317,22 @@ public void startPollingDatabasePeriodically() private Runnable createPollTaskForStartOrder(long startOrder, PeriodicDatabasePoll periodicDatabasePoll) { return () -> { + // If latest poll was an OnDemandDatabasePoll that started less than periodicPollDelay, + // We will wait for (periodicPollDelay - currentTime - LatestOnDemandDatabasePollStartTime) then check again. + try { + long periodicPollDelayNanos = TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis()); + while (latestDatabasePoll != null + && latestDatabasePoll instanceof OnDemandDatabasePoll + && ((OnDemandDatabasePoll) latestDatabasePoll).nanosElapsedFromInitiation() < periodicPollDelayNanos) { + long sleepNano = periodicPollDelayNanos + - ((OnDemandDatabasePoll) latestDatabasePoll).nanosElapsedFromInitiation(); + TimeUnit.NANOSECONDS.sleep(sleepNano); + } + } + catch (Exception e) { + log.debug(e, "Exception found while waiting for next periodic poll"); + } + // poll() is synchronized together with startPollingDatabasePeriodically(), stopPollingDatabasePeriodically() and // isPollingDatabasePeriodically() to ensure that when stopPollingDatabasePeriodically() exits, poll() won't // actually run anymore after that (it could only enter the synchronized section and exit immediately because the @@ -320,8 +342,10 @@ private Runnable createPollTaskForStartOrder(long startOrder, PeriodicDatabasePo lock.lock(); try { if (startOrder == currentStartPollingOrder) { + periodicDatabasePoll.lastPollStartTimestampInMs = System.currentTimeMillis(); poll(); periodicDatabasePoll.firstPollCompletionFuture.complete(null); + latestDatabasePoll = periodicDatabasePoll; } else { log.debug("startOrder = currentStartPollingOrder = %d, skipping poll()", startOrder); } @@ -381,16 +405,16 @@ public void stopPollingDatabasePeriodically() } } - private void awaitOrPerformDatabasePoll() + private void useLatestIfWithinDelayOrPerformNewDatabasePoll() { - // Double-checked locking with awaitLatestDatabasePoll() call playing the role of the "check". - if (awaitLatestDatabasePoll()) { + // Double-checked locking with useLatestSnapshotIfWithinDelay() call playing the role of the "check". + if (useLatestSnapshotIfWithinDelay()) { return; } ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock(); lock.lock(); try { - if (awaitLatestDatabasePoll()) { + if (useLatestSnapshotIfWithinDelay()) { return; } OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll(); @@ -403,11 +427,17 @@ private void awaitOrPerformDatabasePoll() } /** - * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is - * made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise, - * meaning that a new on-demand database poll should be initiated. + * This method returns true without waiting for database poll if the latest {@link DatabasePoll} is a + * {@link PeriodicDatabasePoll} that has completed it's first poll, or an {@link OnDemandDatabasePoll} that is + * made not longer than {@link #periodicPollDelay} from current time. + * This method does wait untill completion for if the latest {@link DatabasePoll} is a + * {@link PeriodicDatabasePoll} that has not completed it's first poll, or an {@link OnDemandDatabasePoll} that is + * already in the process of polling the database. + * This means that any method using this check can read from snapshot that is + * up to {@link SqlSegmentsMetadataManager#periodicPollDelay} old. */ - private boolean awaitLatestDatabasePoll() + @VisibleForTesting + boolean useLatestSnapshotIfWithinDelay() { DatabasePoll latestDatabasePoll = this.latestDatabasePoll; if (latestDatabasePoll instanceof PeriodicDatabasePoll) { @@ -430,6 +460,49 @@ private boolean awaitLatestDatabasePoll() return false; } + /** + * This method will always force a database poll if there is no ongoing database poll. This method will then + * waits for the new poll or the ongoing poll to completes before returning. + * This means that any method using this check can be sure that the latest poll for the snapshot was completed after + * this method was called. + */ + @VisibleForTesting + void forceOrWaitOngoingDatabasePoll() + { + long checkStartTime = System.currentTimeMillis(); + ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock(); + lock.lock(); + try { + DatabasePoll latestDatabasePoll = this.latestDatabasePoll; + try { + //Verify if there was a periodic poll completed while we were waiting for the lock + if (latestDatabasePoll instanceof PeriodicDatabasePoll + && ((PeriodicDatabasePoll) latestDatabasePoll).lastPollStartTimestampInMs > checkStartTime) { + return; + } + // Verify if there was a on-demand poll completed while we were waiting for the lock + if (latestDatabasePoll instanceof OnDemandDatabasePoll) { + long checkStartTimeNanos = TimeUnit.MILLISECONDS.toNanos(checkStartTime); + OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll) latestDatabasePoll; + if (latestOnDemandPoll.initiationTimeNanos > checkStartTimeNanos) { + return; + } + } + } + catch (Exception e) { + // Latest poll was unsuccessful, try to do a new poll + log.debug(e, "Latest poll was unsuccessful. Starting a new poll..."); + } + // Force a database poll + OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll(); + this.latestDatabasePoll = onDemandDatabasePoll; + doOnDemandPoll(onDemandDatabasePoll); + } + finally { + lock.unlock(); + } + } + private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll) { try { @@ -857,19 +930,44 @@ public Set getOvershadowedSegments() @Override public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments() { - awaitOrPerformDatabasePoll(); + useLatestIfWithinDelayOrPerformNewDatabasePoll(); return dataSourcesSnapshot; } + @VisibleForTesting + DataSourcesSnapshot getDataSourcesSnapshot() + { + return dataSourcesSnapshot; + } + + @VisibleForTesting + DatabasePoll getLatestDatabasePoll() + { + return latestDatabasePoll; + } + + @Override public Iterable iterateAllUsedSegments() { - awaitOrPerformDatabasePoll(); - return () -> dataSourcesSnapshot - .getDataSourcesWithAllUsedSegments() - .stream() - .flatMap(dataSource -> dataSource.getSegments().stream()) - .iterator(); + useLatestIfWithinDelayOrPerformNewDatabasePoll(); + return dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot(); + } + + @Override + public Optional> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, + Interval interval, + boolean requiresLatest) + { + if (requiresLatest) { + forceOrWaitOngoingDatabasePoll(); + } else { + useLatestIfWithinDelayOrPerformNewDatabasePoll(); + } + VersionedIntervalTimeline usedSegmentsTimeline + = dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource); + return Optional.fromNullable(usedSegmentsTimeline) + .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE)); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index f8c3f43c76f5..7ef2f66f2339 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -256,6 +256,17 @@ public Map getLoadManagementPeons() * @return tier -> { dataSource -> underReplicationCount } map */ public Map> computeUnderReplicationCountsPerDataSourcePerTier() + { + final Iterable dataSegments = segmentsMetadataManager.iterateAllUsedSegments(); + return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments); + } + + /** + * @return tier -> { dataSource -> underReplicationCount } map + */ + public Map> computeUnderReplicationCountsPerDataSourcePerTierForSegments( + Iterable dataSegments + ) { final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); @@ -263,8 +274,6 @@ public Map> computeUnderReplicationCountsPerDataS return underReplicationCountsPerDataSourcePerTier; } - final Iterable dataSegments = segmentsMetadataManager.iterateAllUsedSegments(); - final DateTime now = DateTimes.nowUtc(); for (final DataSegment segment : dataSegments) { @@ -320,7 +329,7 @@ public Map getLoadStatus() for (ImmutableDruidDataSource dataSource : dataSources) { final Set segments = Sets.newHashSet(dataSource.getSegments()); - final int numUsedSegments = segments.size(); + final int numPublishedSegments = segments.size(); // remove loaded segments for (DruidServer druidServer : serverInventoryView.getInventory()) { @@ -333,10 +342,10 @@ public Map getLoadStatus() } } } - final int numUnloadedSegments = segments.size(); + final int numUnavailableSegments = segments.size(); loadStatus.put( dataSource.getName(), - 100 * ((double) (numUsedSegments - numUnloadedSegments) / (double) numUsedSegments) + 100 * ((double) (numPublishedSegments - numUnavailableSegments) / (double) numPublishedSegments) ); } diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 040297521b80..8ea4dc06ce2e 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -22,11 +22,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; +import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.commons.lang.StringUtils; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.DruidDataSource; @@ -49,6 +51,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.http.security.DatasourceResourceFilter; @@ -96,12 +99,14 @@ public class DataSourcesResource { private static final Logger log = new Logger(DataSourcesResource.class); + private static final long DEFAULT_LOADSTATUS_INTERVAL_OFFSET = 14 * 24 * 60 * 60 * 1000; private final CoordinatorServerView serverInventoryView; private final SegmentsMetadataManager segmentsMetadataManager; private final MetadataRuleManager metadataRuleManager; private final IndexingServiceClient indexingServiceClient; private final AuthorizerMapper authorizerMapper; + private final DruidCoordinator coordinator; @Inject public DataSourcesResource( @@ -109,7 +114,8 @@ public DataSourcesResource( SegmentsMetadataManager segmentsMetadataManager, MetadataRuleManager metadataRuleManager, @Nullable IndexingServiceClient indexingServiceClient, - AuthorizerMapper authorizerMapper + AuthorizerMapper authorizerMapper, + DruidCoordinator coordinator ) { this.serverInventoryView = serverInventoryView; @@ -117,6 +123,7 @@ public DataSourcesResource( this.metadataRuleManager = metadataRuleManager; this.indexingServiceClient = indexingServiceClient; this.authorizerMapper = authorizerMapper; + this.coordinator = coordinator; } @GET @@ -391,6 +398,130 @@ public Response getServedSegmentsInInterval( return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains); } + @GET + @Path("/{dataSourceName}/loadstatus") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(DatasourceResourceFilter.class) + public Response getDatasourceLoadstatus( + @PathParam("dataSourceName") String dataSourceName, + @QueryParam("forceMetadataRefresh") final Boolean forceMetadataRefresh, + @QueryParam("interval") @Nullable final String interval, + @QueryParam("simple") @Nullable final String simple, + @QueryParam("full") @Nullable final String full + ) + { + if (forceMetadataRefresh == null) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("Invalid request. forceMetadataRefresh must be specified") + .build(); + } + final Interval theInterval; + if (interval == null) { + long currentTimeInMs = System.currentTimeMillis(); + theInterval = Intervals.utc(currentTimeInMs - DEFAULT_LOADSTATUS_INTERVAL_OFFSET, currentTimeInMs); + } else { + theInterval = Intervals.of(interval.replace('_', '/')); + } + + Optional> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + dataSourceName, + theInterval, + forceMetadataRefresh + ); + + if (!segments.isPresent()) { + return logAndCreateDataSourceNotFoundResponse(dataSourceName); + } + + if (Iterables.size(segments.get()) == 0) { + return Response + .status(Response.Status.NO_CONTENT) + .entity("No used segment found for the given datasource and interval") + .build(); + } + + if (simple != null) { + // Calculate response for simple mode + SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments.get()); + return Response.ok( + ImmutableMap.of( + dataSourceName, + segmentsLoadStatistics.getNumUnavailableSegments() + ) + ).build(); + } else if (full != null) { + // Calculate response for full mode + Map> segmentLoadMap + = coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get()); + if (segmentLoadMap.isEmpty()) { + return Response.serverError() + .entity("Coordinator segment replicant lookup is not initialized yet. Try again later.") + .build(); + } + return Response.ok(segmentLoadMap).build(); + } else { + // Calculate response for default mode + SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments.get()); + return Response.ok( + ImmutableMap.of( + dataSourceName, + 100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments()) / (double) segmentsLoadStatistics.getNumPublishedSegments()) + ) + ).build(); + } + } + + private SegmentsLoadStatistics computeSegmentLoadStatistics(Iterable segments) + { + Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); + int numPublishedSegments = 0; + int numUnavailableSegments = 0; + int numLoadedSegments = 0; + for (DataSegment segment : segments) { + numPublishedSegments++; + if (!segmentLoadInfos.containsKey(segment.getId())) { + numUnavailableSegments++; + } else { + numLoadedSegments++; + } + } + return new SegmentsLoadStatistics(numPublishedSegments, numUnavailableSegments, numLoadedSegments); + } + + private static class SegmentsLoadStatistics + { + private int numPublishedSegments; + private int numUnavailableSegments; + private int numLoadedSegments; + + SegmentsLoadStatistics( + int numPublishedSegments, + int numUnavailableSegments, + int numLoadedSegments + ) + { + this.numPublishedSegments = numPublishedSegments; + this.numUnavailableSegments = numUnavailableSegments; + this.numLoadedSegments = numLoadedSegments; + } + + public int getNumPublishedSegments() + { + return numPublishedSegments; + } + + public int getNumUnavailableSegments() + { + return numUnavailableSegments; + } + + public int getNumLoadedSegments() + { + return numLoadedSegments; + } + } + /** * The property names belong to the public HTTP JSON API. */ diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index be6354a63c75..57df44073460 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -20,11 +20,13 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -43,6 +45,7 @@ import org.junit.Test; import java.io.IOException; +import java.util.Set; import java.util.stream.Collectors; @@ -117,7 +120,7 @@ public void setUp() throws Exception { TestDerbyConnector connector = derbyConnectorRule.getConnector(); SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(); - config.setPollDuration(Period.seconds(1)); + config.setPollDuration(Period.seconds(3)); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), @@ -148,30 +151,124 @@ public void teardown() } @Test - public void testPoll() + public void testPollPeriodically() { + DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertNull(dataSourcesSnapshot); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); - sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + // This call make sure that the first poll is completed + sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay(); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertEquals( ImmutableSet.of("wikipedia"), sqlSegmentsMetadataManager.retrieveAllDataSourceNames() ); Assert.assertEquals( ImmutableList.of("wikipedia"), - sqlSegmentsMetadataManager - .getImmutableDataSourcesWithAllUsedSegments() - .stream() - .map(ImmutableDruidDataSource::getName) - .collect(Collectors.toList()) + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) ); Assert.assertEquals( ImmutableSet.of(segment1, segment2), - ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia").getSegments()) + ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments()) ); Assert.assertEquals( ImmutableSet.of(segment1, segment2), - ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot()) + ); + } + + @Test + public void testPollOnDemand() + { + DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertNull(dataSourcesSnapshot); + // This should return false and not wait/poll anything as we did not schedule periodic poll + Assert.assertFalse(sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay()); + Assert.assertNull(dataSourcesSnapshot); + // This call will force on demand poll + sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll(); + Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableSet.of("wikipedia"), + sqlSegmentsMetadataManager.retrieveAllDataSourceNames() + ); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) + ); + Assert.assertEquals( + ImmutableSet.of(segment1, segment2), + ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments()) + ); + Assert.assertEquals( + ImmutableSet.of(segment1, segment2), + ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot()) + ); + } + + @Test(timeout = 60_000) + public void testPollPeriodicallyAndOnDemandInterleave() throws Exception + { + DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertNull(dataSourcesSnapshot); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + // This call make sure that the first poll is completed + sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay(); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) + ); + final String newDataSource2 = "wikipedia2"; + final DataSegment newSegment2 = createNewSegment1(newDataSource2); + publisher.publishSegment(newSegment2); + + // This call will force on demand poll + sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll); + // New datasource should now be in the snapshot since we just force on demand poll. + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableList.of("wikipedia2", "wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) + ); + + final String newDataSource3 = "wikipedia3"; + final DataSegment newSegment3 = createNewSegment1(newDataSource3); + publisher.publishSegment(newSegment3); + + // This time wait for periodic poll (not doing on demand poll so we have to wait a bit...) + while (sqlSegmentsMetadataManager.getDataSourcesSnapshot().getDataSource(newDataSource3) == null) { + Thread.sleep(1000); + } + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableList.of("wikipedia2", "wikipedia3", "wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) ); } @@ -749,4 +846,46 @@ public void testStopAndStart() sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.stopPollingDatabasePeriodically(); } + + @Test + public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval() throws Exception + { + final Interval theInterval = Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000"); + Optional> segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + "wikipedia", theInterval, true + ); + Assert.assertTrue(segments.isPresent()); + Set dataSegmentSet = ImmutableSet.copyOf(segments.get()); + Assert.assertEquals(1, dataSegmentSet.size()); + Assert.assertTrue(dataSegmentSet.contains(segment1)); + + final DataSegment newSegment2 = createSegment( + "wikipedia", + "2012-03-16T00:00:00.000/2012-03-17T00:00:00.000", + "2017-10-15T20:19:12.565Z", + "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", + 0 + ); + publisher.publishSegment(newSegment2); + + // New segment is not returned since we call without force poll + segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + "wikipedia", theInterval, false + ); + Assert.assertTrue(segments.isPresent()); + dataSegmentSet = ImmutableSet.copyOf(segments.get()); + Assert.assertEquals(1, dataSegmentSet.size()); + Assert.assertTrue(dataSegmentSet.contains(segment1)); + + // New segment is returned since we call with force poll + segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + "wikipedia", theInterval, true + ); + Assert.assertTrue(segments.isPresent()); + dataSegmentSet = ImmutableSet.copyOf(segments.get()); + Assert.assertEquals(2, dataSegmentSet.size()); + Assert.assertTrue(dataSegmentSet.contains(segment1)); + Assert.assertTrue(dataSegmentSet.contains(newSegment2)); + } + } diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 10a7f97300c9..39e02ae86def 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -19,11 +19,14 @@ package org.apache.druid.server.http; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; @@ -39,6 +42,7 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.rules.IntervalDropRule; import org.apache.druid.server.coordinator.rules.IntervalLoadRule; import org.apache.druid.server.coordinator.rules.Rule; @@ -176,7 +180,7 @@ public void testGetFullQueryableDataSources() EasyMock.replay(inventoryView, server, request); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER); + new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null); Response response = dataSourcesResource.getQueryableDataSources("full", null, request); Set result = (Set) response.getEntity(); Assert.assertEquals(200, response.getStatus()); @@ -250,7 +254,7 @@ public Access authorize(AuthenticationResult authenticationResult1, Resource res } }; - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, authMapper); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, authMapper, null); Response response = dataSourcesResource.getQueryableDataSources("full", null, request); Set result = (Set) response.getEntity(); @@ -289,7 +293,7 @@ public void testGetSimpleQueryableDataSources() EasyMock.replay(inventoryView, server, request); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER); + new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null); Response response = dataSourcesResource.getQueryableDataSources(null, "simple", request); Assert.assertEquals(200, response.getStatus()); List> results = (List>) response.getEntity(); @@ -313,7 +317,7 @@ public void testFullGetTheDataSource() EasyMock.replay(inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", "full"); ImmutableDruidDataSource result = (ImmutableDruidDataSource) response.getEntity(); Assert.assertEquals(200, response.getStatus()); @@ -329,7 +333,7 @@ public void testNullGetTheDataSource() EasyMock.replay(inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Assert.assertEquals(204, dataSourcesResource.getDataSource("none", null).getStatus()); EasyMock.verify(inventoryView, server); } @@ -347,7 +351,7 @@ public void testSimpleGetTheDataSource() EasyMock.replay(inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); Map> result = (Map>) response.getEntity(); @@ -380,7 +384,7 @@ public void testSimpleGetTheDataSourceManyTiers() EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server, server2, server3)).atLeastOnce(); EasyMock.replay(inventoryView, server, server2, server3); - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); Map> result = (Map>) response.getEntity(); @@ -418,7 +422,7 @@ public void testSimpleGetTheDataSourceWithReplicatedSegments() EasyMock.replay(inventoryView); - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); Map> result1 = (Map>) response.getEntity(); @@ -463,7 +467,7 @@ public void testGetSegmentDataSourceIntervals() expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z")); expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z")); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getIntervalsWithServedSegmentsOrAllServedSegmentsPerIntervals( "invalidDataSource", @@ -523,7 +527,7 @@ public void testGetServedSegmentsInIntervalInDataSource() EasyMock.replay(inventoryView); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getServedSegmentsInInterval( "invalidDataSource", "2010-01-01/P1D", @@ -593,7 +597,7 @@ public void testKillSegmentsInIntervalInDataSource() EasyMock.replay(indexingServiceClient, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null); + new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null, null); Response response = dataSourcesResource.killUnusedSegmentsInInterval("datasource1", interval); Assert.assertEquals(200, response.getStatus()); @@ -607,7 +611,7 @@ public void testMarkAsUnusedAllSegmentsInDataSource() IndexingServiceClient indexingServiceClient = EasyMock.createStrictMock(IndexingServiceClient.class); EasyMock.replay(indexingServiceClient, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null); + new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null, null); try { Response response = dataSourcesResource.markAsUnusedAllSegmentsOrKillUnusedSegmentsInInterval("datasource", "true", "???"); @@ -630,7 +634,7 @@ public void testIsHandOffComplete() Rule loadRule = new IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), null); Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z")); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, databaseRuleManager, null, null); + new DataSourcesResource(inventoryView, null, databaseRuleManager, null, null, null); // test dropped EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1")) @@ -699,7 +703,7 @@ public void testMarkSegmentAsUsed() EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(true).once(); EasyMock.replay(segmentsMetadataManager); - DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), segment.getId().toString()); Assert.assertEquals(200, response.getStatus()); @@ -713,7 +717,7 @@ public void testMarkSegmentAsUsedNoChange() EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(false).once(); EasyMock.replay(segmentsMetadataManager); - DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), segment.getId().toString()); Assert.assertEquals(200, response.getStatus()); @@ -734,7 +738,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsInterval() EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -757,7 +761,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsIntervalNoneUpdated() EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -780,7 +784,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsSet() throws UnknownSegmentIdsE EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -803,7 +807,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsIntervalException() EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -821,7 +825,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsNoDataSource() EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -835,7 +839,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsNoDataSource() public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadNoArguments() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -848,7 +852,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadNoArguments() public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadBothArguments() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -861,7 +865,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadBothArguments() public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadEmptyArray() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -874,7 +878,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadEmptyArray() public void testMarkAsUsedNonOvershadowedSegmentsNoPayload() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments("datasource1", null); Assert.assertEquals(400, response.getStatus()); @@ -1026,7 +1030,7 @@ public void testMarkSegmentsAsUnused() new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), response.getEntity()); @@ -1049,7 +1053,7 @@ public void testMarkSegmentsAsUnusedNoChanges() new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); @@ -1074,7 +1078,7 @@ public void testMarkSegmentsAsUnusedException() new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(500, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1096,7 +1100,7 @@ public void testMarkAsUnusedSegmentsInInterval() new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), response.getEntity()); @@ -1119,7 +1123,7 @@ public void testMarkAsUnusedSegmentsInIntervalNoChanges() new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); @@ -1143,7 +1147,7 @@ public void testMarkAsUnusedSegmentsInIntervalException() new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(500, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1154,7 +1158,7 @@ public void testMarkAsUnusedSegmentsInIntervalException() public void testMarkSegmentsAsUnusedNullPayload() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", null); Assert.assertEquals(400, response.getStatus()); @@ -1169,7 +1173,7 @@ public void testMarkSegmentsAsUnusedNullPayload() public void testMarkSegmentsAsUnusedInvalidPayload() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); final DataSourcesResource.MarkDataSourceSegmentsPayload payload = new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null); @@ -1183,7 +1187,7 @@ public void testMarkSegmentsAsUnusedInvalidPayload() public void testMarkSegmentsAsUnusedInvalidPayloadBothArguments() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); final DataSourcesResource.MarkDataSourceSegmentsPayload payload = new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-01/P1D"), ImmutableSet.of()); @@ -1193,6 +1197,251 @@ public void testMarkSegmentsAsUnusedInvalidPayloadBothArguments() Assert.assertNotNull(response.getEntity()); } + @Test + public void testGetDatasourceLoadstatusForceMetadataRefreshNull() + { + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testGetDatasourceLoadstatusNoSegmentForInterval() + { + List segments = ImmutableList.of(); + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq( + "datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.replay(segmentsMetadataManager); + + DataSourcesResource dataSourcesResource = new DataSourcesResource( + inventoryView, + segmentsMetadataManager, + null, + null, + null, + null + ); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); + Assert.assertEquals(204, response.getStatus()); + } + + @Test + public void testGetDatasourceLoadstatusDefault() + { + DataSegment datasource1Segment1 = new DataSegment( + "datasource1", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 10 + ); + + DataSegment datasource1Segment2 = new DataSegment( + "datasource1", + Intervals.of("2010-01-22/P1D"), + "", + null, + null, + null, + null, + 0x9, + 20 + ); + DataSegment datasource2Segment1 = new DataSegment( + "datasource2", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 30 + ); + List segments = ImmutableList.of(datasource1Segment1, datasource1Segment2); + Map completedLoadInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1), + datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2), + datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1) + ); + Map halfLoadedInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1) + ); + + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(100.0, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + EasyMock.reset(segmentsMetadataManager, inventoryView); + + // Test when datasource half loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); + response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(50.0, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + } + + @Test + public void testGetDatasourceLoadstatusSimple() + { + DataSegment datasource1Segment1 = new DataSegment( + "datasource1", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 10 + ); + + DataSegment datasource1Segment2 = new DataSegment( + "datasource1", + Intervals.of("2010-01-22/P1D"), + "", + null, + null, + null, + null, + 0x9, + 20 + ); + DataSegment datasource2Segment1 = new DataSegment( + "datasource2", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 30 + ); + List segments = ImmutableList.of(datasource1Segment1, datasource1Segment2); + Map completedLoadInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1), + datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2), + datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1) + ); + Map halfLoadedInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1) + ); + + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(0, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + EasyMock.reset(segmentsMetadataManager, inventoryView); + + // Test when datasource half loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); + response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(1, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + } + + @Test + public void testGetDatasourceLoadstatusFull() + { + DataSegment datasource1Segment1 = new DataSegment( + "datasource1", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 10 + ); + + DataSegment datasource1Segment2 = new DataSegment( + "datasource1", + Intervals.of("2010-01-22/P1D"), + "", + null, + null, + null, + null, + 0x9, + 20 + ); + List segments = ImmutableList.of(datasource1Segment1, datasource1Segment2); + + final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); + Object2LongMap tier1 = new Object2LongOpenHashMap<>(); + tier1.put("datasource1", 0L); + Object2LongMap tier2 = new Object2LongOpenHashMap<>(); + tier2.put("datasource1", 3L); + underReplicationCountsPerDataSourcePerTier.put("tier1", tier1); + underReplicationCountsPerDataSourcePerTier.put("tier2", tier2); + + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + DruidCoordinator druidCoordinator = EasyMock.createMock(DruidCoordinator.class); + EasyMock.expect(druidCoordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments)) + .andReturn(underReplicationCountsPerDataSourcePerTier).once(); + + EasyMock.replay(segmentsMetadataManager, druidCoordinator); + + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, druidCoordinator); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, "full"); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(2, ((Map) response.getEntity()).size()); + Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier1")).size()); + Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier2")).size()); + Assert.assertEquals(0L, ((Map) ((Map) response.getEntity()).get("tier1")).get("datasource1")); + Assert.assertEquals(3L, ((Map) ((Map) response.getEntity()).get("tier2")).get("datasource1")); + EasyMock.verify(segmentsMetadataManager); + } + private DruidServerMetadata createRealtimeServerMetadata(String name) { return createServerMetadata(name, ServerType.REALTIME);