From bd95578e6a4875aa106b1710b3c1b78f46bbc12f Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 1 Jun 2020 17:57:56 -1000 Subject: [PATCH 01/22] API to verify a datasource has the latest ingested data --- .../druid/client/CoordinatorServerView.java | 4 + .../metadata/SegmentsMetadataManager.java | 12 ++ .../metadata/SqlSegmentsMetadataManager.java | 118 +++++++++++++++--- .../server/http/DataSourcesResource.java | 33 +++++ 4 files changed, 147 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java index 2517a8f0e9be..538cc2f526f3 100644 --- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java +++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java @@ -200,6 +200,10 @@ public VersionedIntervalTimeline getTimeline(DataSource } } + public Map getSegmentLoadInfos() + { + return segmentLoadInfos; + } @Override public DruidServer getInventoryValue(String serverKey) diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 4f97b158021e..d1ba784dbba2 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -113,6 +113,18 @@ int markAsUsedNonOvershadowedSegments(String dataSource, Set segmentIds) */ Iterable iterateAllUsedSegments(); + /** + * Returns an iterable to go over all used and non-overshadowed segments of given data sources over given interval. + * The order in which segments are iterated is unspecified. + * If {@param requiresLatest} is true then a force metadatastore poll will be triggered. This can cause a longer + * response time but will ensure that the latest segment information (at the time this method is called) is returned. + * If {@param requiresLatest} is false then segment information from stale snapshot of up to the last periodic poll + * period {@link SqlSegmentsMetadataManager#periodicPollDelay} will be used. + */ + Iterable iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, + Interval interval, + boolean requiresLatest); + /** * Retrieves all data source names for which there are segment in the database, regardless of whether those segments * are used or not. If there are no segments in the database, returns an empty set. diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 92a874863527..3aab241c1b6e 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -80,6 +81,8 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import static java.lang.Thread.sleep; + /** * */ @@ -104,6 +107,7 @@ private static class PeriodicDatabasePoll implements DatabasePoll * leadership changes. */ final CompletableFuture firstPollCompletionFuture = new CompletableFuture<>(); + long lastPollStartTimestampInMs = -1; } /** @@ -127,7 +131,7 @@ long nanosElapsedFromInitiation() * called at the same time if two different threads are calling them. This might be possible if Coordinator gets and * drops leadership repeatedly in quick succession. * - * This lock is also used to synchronize {@link #awaitOrPerformDatabasePoll} for times when SqlSegmentsMetadataManager + * This lock is also used to synchronize {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll} for times when SqlSegmentsMetadataManager * is not polling the database periodically (in other words, when the Coordinator is not the leader). */ private final ReentrantReadWriteLock startStopPollLock = new ReentrantReadWriteLock(); @@ -155,7 +159,7 @@ long nanosElapsedFromInitiation() * easy to forget to do. * * This field may be updated from {@link #exec}, or from whatever thread calling {@link #doOnDemandPoll} via {@link - * #awaitOrPerformDatabasePoll()} via one of the public methods of SqlSegmentsMetadataManager. + * #useLatestIfWithinDelayOrPerformNewDatabasePoll()} via one of the public methods of SqlSegmentsMetadataManager. */ private volatile @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = null; @@ -170,7 +174,7 @@ long nanosElapsedFromInitiation() * Note that if there is a happens-before relationship between a call to {@link #startPollingDatabasePeriodically()} * (on Coordinators' leadership change) and one of the methods accessing the {@link #dataSourcesSnapshot}'s state in * this class the latter is guaranteed to await for the initiated periodic poll. This is because when the latter - * method calls to {@link #awaitLatestDatabasePoll()} via {@link #awaitOrPerformDatabasePoll}, they will + * method calls to {@link #useLatestSnapshotIfWithinDelay()} via {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll}, they will * see the latest {@link PeriodicDatabasePoll} value (stored in this field, latestDatabasePoll, in {@link * #startPollingDatabasePeriodically()}) and to await on its {@link PeriodicDatabasePoll#firstPollCompletionFuture}. * @@ -185,7 +189,7 @@ long nanosElapsedFromInitiation() * SegmentsMetadataManager} and guarantee that it always returns consistent and relatively up-to-date data from methods * like {@link #getImmutableDataSourceWithUsedSegments}, while avoiding excessive repetitive polls. The last part * is achieved via "hooking on" other polls by awaiting on {@link PeriodicDatabasePoll#firstPollCompletionFuture} or - * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #awaitOrPerformDatabasePoll} method + * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll} method * implementation for details. * * Note: the overall implementation of periodic/on-demand polls is not completely optimal: for example, when the @@ -194,7 +198,7 @@ long nanosElapsedFromInitiation() * during Coordinator leadership switches is not a priority. * * This field is {@code volatile} because it's checked and updated in a double-checked locking manner in {@link - * #awaitOrPerformDatabasePoll()}. + * #useLatestIfWithinDelayOrPerformNewDatabasePoll()}. */ private volatile @Nullable DatabasePoll latestDatabasePoll = null; @@ -311,6 +315,21 @@ public void startPollingDatabasePeriodically() private Runnable createPollTaskForStartOrder(long startOrder, PeriodicDatabasePoll periodicDatabasePoll) { return () -> { + // If latest poll was an OnDemandDatabasePoll that started less than periodicPollDelay, + // We will wait for (periodicPollDelay - currentTime - LatestOnDemandDatabasePollStartTime) then check again. + try { + long periodicPollDelayNanos = TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis()); + while (latestDatabasePoll != null + && latestDatabasePoll instanceof OnDemandDatabasePoll + && ((OnDemandDatabasePoll) latestDatabasePoll).nanosElapsedFromInitiation() < periodicPollDelayNanos) { + long sleepNano = periodicPollDelayNanos + - ((OnDemandDatabasePoll) latestDatabasePoll).nanosElapsedFromInitiation(); + TimeUnit.NANOSECONDS.sleep(sleepNano); + } + } catch (Exception e) { + log.debug(e, "Exception found while waiting for next periodic poll"); + } + // poll() is synchronized together with startPollingDatabasePeriodically(), stopPollingDatabasePeriodically() and // isPollingDatabasePeriodically() to ensure that when stopPollingDatabasePeriodically() exits, poll() won't // actually run anymore after that (it could only enter the synchronized section and exit immediately because the @@ -320,8 +339,10 @@ private Runnable createPollTaskForStartOrder(long startOrder, PeriodicDatabasePo lock.lock(); try { if (startOrder == currentStartPollingOrder) { + periodicDatabasePoll.lastPollStartTimestampInMs = System.currentTimeMillis(); poll(); periodicDatabasePoll.firstPollCompletionFuture.complete(null); + latestDatabasePoll = periodicDatabasePoll; } else { log.debug("startOrder = currentStartPollingOrder = %d, skipping poll()", startOrder); } @@ -381,16 +402,16 @@ public void stopPollingDatabasePeriodically() } } - private void awaitOrPerformDatabasePoll() + private void useLatestIfWithinDelayOrPerformNewDatabasePoll() { - // Double-checked locking with awaitLatestDatabasePoll() call playing the role of the "check". - if (awaitLatestDatabasePoll()) { + // Double-checked locking with useLatestSnapshotIfWithinDelay() call playing the role of the "check". + if (useLatestSnapshotIfWithinDelay()) { return; } ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock(); lock.lock(); try { - if (awaitLatestDatabasePoll()) { + if (useLatestSnapshotIfWithinDelay()) { return; } OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll(); @@ -403,11 +424,16 @@ private void awaitOrPerformDatabasePoll() } /** - * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is - * made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise, - * meaning that a new on-demand database poll should be initiated. + * This method returns true without waiting for database poll if the latest {@link DatabasePoll} is a + * {@link PeriodicDatabasePoll} that has completed it's first poll, or an {@link OnDemandDatabasePoll} that is + * made not longer than {@link #periodicPollDelay} from current time. + * This method does wait untill completion for if the latest {@link DatabasePoll} is a + * {@link PeriodicDatabasePoll} that has not completed it's first poll, or an {@link OnDemandDatabasePoll} that is + * alrady in the process of polling the database. + * This means that any method using this check can read from snapshot that is + * up to {@link SqlSegmentsMetadataManager#periodicPollDelay} old. */ - private boolean awaitLatestDatabasePoll() + private boolean useLatestSnapshotIfWithinDelay() { DatabasePoll latestDatabasePoll = this.latestDatabasePoll; if (latestDatabasePoll instanceof PeriodicDatabasePoll) { @@ -430,6 +456,47 @@ private boolean awaitLatestDatabasePoll() return false; } + /** + * This method will always force a database poll if there is no ongoing database poll. This method will then + * waits for the new poll or the ongoing poll to completes before returning. + * This means that any method using this check can be sure that the latest poll for the snapshot was completed after + * this method was called. + */ + public void forceOrWaitOngoingDatabasePoll() + { + long checkStartTime = System.currentTimeMillis(); + ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock(); + lock.lock(); + try { + DatabasePoll latestDatabasePoll = this.latestDatabasePoll; + try { + //Verify if there was a periodic poll completed while we were waiting for the lock + if (latestDatabasePoll instanceof PeriodicDatabasePoll + && ((PeriodicDatabasePoll) latestDatabasePoll).lastPollStartTimestampInMs > checkStartTime) { + return; + } + // Verify if there was a on-demand poll completed while we were waiting for the lock + if (latestDatabasePoll instanceof OnDemandDatabasePoll) { + long checkStartTimeNanos = TimeUnit.MILLISECONDS.toNanos(checkStartTime); + OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll) latestDatabasePoll; + if (latestOnDemandPoll.initiationTimeNanos > checkStartTimeNanos) { + return; + } + } + } catch (Exception e) { + // Latest poll was unsuccessful, try to do a new poll + log.debug(e,"Latest poll was unsuccessful. Starting a new poll..."); + } + // Force a database poll + OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll(); + this.latestDatabasePoll = onDemandDatabasePoll; + doOnDemandPoll(onDemandDatabasePoll); + } + finally { + lock.unlock(); + } + } + private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll) { try { @@ -857,19 +924,30 @@ public Set getOvershadowedSegments() @Override public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments() { - awaitOrPerformDatabasePoll(); + useLatestIfWithinDelayOrPerformNewDatabasePoll(); return dataSourcesSnapshot; } @Override public Iterable iterateAllUsedSegments() { - awaitOrPerformDatabasePoll(); - return () -> dataSourcesSnapshot - .getDataSourcesWithAllUsedSegments() - .stream() - .flatMap(dataSource -> dataSource.getSegments().stream()) - .iterator(); + useLatestIfWithinDelayOrPerformNewDatabasePoll(); + return dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot(); + } + + @Override + public Iterable iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, + Interval interval, + boolean requiresLatest) + { + if (requiresLatest) { + forceOrWaitOngoingDatabasePoll(); + } else { + useLatestIfWithinDelayOrPerformNewDatabasePoll(); + } + VersionedIntervalTimeline usedSegmentsTimeline + = dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource); + return usedSegmentsTimeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE); } @Override diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 040297521b80..838de75ec27c 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -27,6 +27,7 @@ import com.google.common.collect.Iterables; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; +import it.unimi.dsi.fastutil.objects.Object2IntMaps; import org.apache.commons.lang.StringUtils; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.DruidDataSource; @@ -391,6 +392,38 @@ public Response getServedSegmentsInInterval( return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains); } + @GET + @Path("/{dataSourceName}/loadstatus") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(DatasourceResourceFilter.class) + public Response getDatasourceLoadstatus( + @PathParam("dataSourceName") String dataSourceName, + @QueryParam("interval") @Nullable final String interval, + @QueryParam("firstCheck") @Nullable final Boolean firstCheck + ) + { + if (serverInventoryView == null || serverInventoryView.getSegmentLoadInfos() != null) { + return Response.ok(ImmutableMap.of("loaded", false)).build(); + } + // Force poll + Interval theInterval = interval == null ? Intervals.ETERNITY : Intervals.of(interval); + boolean requiresMetadataStorePoll = firstCheck == null ? true :firstCheck; + + Iterable segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + dataSourceName, + theInterval, + requiresMetadataStorePoll + ); + + Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); + for (DataSegment segment : segments) { + if (!segmentLoadInfos.containsKey(segment.getId())) { + return Response.ok(ImmutableMap.of("loaded", false)).build(); + } + } + return Response.ok(ImmutableMap.of("loaded", true)).build(); + } + /** * The property names belong to the public HTTP JSON API. */ From 8d2749e34f0d74ccfc480da14c0b2826ef354116 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 1 Jun 2020 23:13:38 -1000 Subject: [PATCH 02/22] API to verify a datasource has the latest ingested data --- .../java/org/apache/druid/server/http/DataSourcesResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 838de75ec27c..ecc734cf1784 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -402,7 +402,7 @@ public Response getDatasourceLoadstatus( @QueryParam("firstCheck") @Nullable final Boolean firstCheck ) { - if (serverInventoryView == null || serverInventoryView.getSegmentLoadInfos() != null) { + if (serverInventoryView == null || serverInventoryView.getSegmentLoadInfos() == null) { return Response.ok(ImmutableMap.of("loaded", false)).build(); } // Force poll From aca82f3eafa1c9ca9b4cb6dde48b55442b86c0bd Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 2 Jun 2020 00:13:01 -1000 Subject: [PATCH 03/22] API to verify a datasource has the latest ingested data --- .../apache/druid/metadata/SegmentsMetadataManager.java | 7 ++++--- .../druid/metadata/SqlSegmentsMetadataManager.java | 10 ++++++---- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index d1ba784dbba2..222f765f286a 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -20,6 +20,7 @@ package org.apache.druid.metadata; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.timeline.DataSegment; @@ -121,9 +122,9 @@ int markAsUsedNonOvershadowedSegments(String dataSource, Set segmentIds) * If {@param requiresLatest} is false then segment information from stale snapshot of up to the last periodic poll * period {@link SqlSegmentsMetadataManager#periodicPollDelay} will be used. */ - Iterable iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, - Interval interval, - boolean requiresLatest); + Optional> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, + Interval interval, + boolean requiresLatest); /** * Retrieves all data source names for which there are segment in the database, regardless of whether those segments diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 3aab241c1b6e..e025970a5d8e 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -20,6 +20,7 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -936,9 +937,9 @@ public Iterable iterateAllUsedSegments() } @Override - public Iterable iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, - Interval interval, - boolean requiresLatest) + public Optional> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, + Interval interval, + boolean requiresLatest) { if (requiresLatest) { forceOrWaitOngoingDatabasePoll(); @@ -947,7 +948,8 @@ public Iterable iterateAllUsedNonOvershadowedSegmentsForDatasourceI } VersionedIntervalTimeline usedSegmentsTimeline = dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource); - return usedSegmentsTimeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE); + return Optional.of(usedSegmentsTimeline) + .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE)); } @Override From 1a1a0cdb13bc473b81e4ecf7a9e76afb035a72e5 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 2 Jun 2020 00:21:08 -1000 Subject: [PATCH 04/22] API to verify a datasource has the latest ingested data --- .../apache/druid/server/http/DataSourcesResource.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index ecc734cf1784..5ae127b8851d 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -409,14 +410,19 @@ public Response getDatasourceLoadstatus( Interval theInterval = interval == null ? Intervals.ETERNITY : Intervals.of(interval); boolean requiresMetadataStorePoll = firstCheck == null ? true :firstCheck; - Iterable segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + Optional> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( dataSourceName, theInterval, requiresMetadataStorePoll ); + if (!segments.isPresent()) { + return logAndCreateDataSourceNotFoundResponse(dataSourceName); + } + + Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); - for (DataSegment segment : segments) { + for (DataSegment segment : segments.get()) { if (!segmentLoadInfos.containsKey(segment.getId())) { return Response.ok(ImmutableMap.of("loaded", false)).build(); } From 6dc035cb2d5dcf5796c7ad516ab39700ef7204a5 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 2 Jun 2020 00:48:56 -1000 Subject: [PATCH 05/22] API to verify a datasource has the latest ingested data --- .../org/apache/druid/metadata/SqlSegmentsMetadataManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index e025970a5d8e..52f62437e7e2 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -948,7 +948,7 @@ public Optional> iterateAllUsedNonOvershadowedSegmentsForD } VersionedIntervalTimeline usedSegmentsTimeline = dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource); - return Optional.of(usedSegmentsTimeline) + return Optional.fromNullable(usedSegmentsTimeline) .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE)); } From 0a7dc4e2a357a9094e08fcd8f4684abe1ff0ceae Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 5 Jun 2020 17:43:41 -1000 Subject: [PATCH 06/22] fix checksyle --- .../druid/metadata/SqlSegmentsMetadataManager.java | 10 +++++----- .../apache/druid/server/http/DataSourcesResource.java | 3 +-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 52f62437e7e2..7d554971bd43 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -82,8 +82,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static java.lang.Thread.sleep; - /** * */ @@ -327,7 +325,8 @@ private Runnable createPollTaskForStartOrder(long startOrder, PeriodicDatabasePo - ((OnDemandDatabasePoll) latestDatabasePoll).nanosElapsedFromInitiation(); TimeUnit.NANOSECONDS.sleep(sleepNano); } - } catch (Exception e) { + } + catch (Exception e) { log.debug(e, "Exception found while waiting for next periodic poll"); } @@ -484,9 +483,10 @@ public void forceOrWaitOngoingDatabasePoll() return; } } - } catch (Exception e) { + } + catch (Exception e) { // Latest poll was unsuccessful, try to do a new poll - log.debug(e,"Latest poll was unsuccessful. Starting a new poll..."); + log.debug(e, "Latest poll was unsuccessful. Starting a new poll..."); } // Force a database poll OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll(); diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 5ae127b8851d..eee67f833a24 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -28,7 +28,6 @@ import com.google.common.collect.Iterables; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; -import it.unimi.dsi.fastutil.objects.Object2IntMaps; import org.apache.commons.lang.StringUtils; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.DruidDataSource; @@ -408,7 +407,7 @@ public Response getDatasourceLoadstatus( } // Force poll Interval theInterval = interval == null ? Intervals.ETERNITY : Intervals.of(interval); - boolean requiresMetadataStorePoll = firstCheck == null ? true :firstCheck; + boolean requiresMetadataStorePoll = firstCheck == null ? true : firstCheck; Optional> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( dataSourceName, From 0a019d1b2adbcfb7a92ffc496d3c21edb510ef86 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 9 Jun 2020 15:58:08 -1000 Subject: [PATCH 07/22] API to verify a datasource has the latest ingested data --- .../server/http/DataSourcesResource.java | 104 ++++++++++++++++-- 1 file changed, 93 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index eee67f833a24..6ca07b372ba7 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -24,10 +24,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.collect.Table; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.commons.lang.StringUtils; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.DruidDataSource; @@ -399,15 +403,21 @@ public Response getServedSegmentsInInterval( public Response getDatasourceLoadstatus( @PathParam("dataSourceName") String dataSourceName, @QueryParam("interval") @Nullable final String interval, - @QueryParam("firstCheck") @Nullable final Boolean firstCheck + @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh, + @QueryParam("simple") @Nullable final String simple, + @QueryParam("full") @Nullable final String full ) { - if (serverInventoryView == null || serverInventoryView.getSegmentLoadInfos() == null) { - return Response.ok(ImmutableMap.of("loaded", false)).build(); + final Interval theInterval; + if (interval == null) { + long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000; + long currentTimeInMs = System.currentTimeMillis(); + theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs); + } else { + theInterval = Intervals.of(interval); } - // Force poll - Interval theInterval = interval == null ? Intervals.ETERNITY : Intervals.of(interval); - boolean requiresMetadataStorePoll = firstCheck == null ? true : firstCheck; + + boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh; Optional> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( dataSourceName, @@ -418,17 +428,89 @@ public Response getDatasourceLoadstatus( if (!segments.isPresent()) { return logAndCreateDataSourceNotFoundResponse(dataSourceName); } + Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); + if (simple != null) { + // Calculate resposne for simple mode + int numUnloadedSegments = 0; + for (DataSegment segment : segments.get()) { + if (!segmentLoadInfos.containsKey(segment.getId())) { + numUnloadedSegments++; + } + } + return Response.ok( + ImmutableMap.of( + dataSourceName, + numUnloadedSegments + ) + ).build(); + } else if (full != null) { + // Calculate resposne for full mode + final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); + final List rules = metadataRuleManager.getRulesWithDefault(dataSourceName); + final Table segmentsInCluster = HashBasedTable.create();; + final DateTime now = DateTimes.nowUtc(); - Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); - for (DataSegment segment : segments.get()) { - if (!segmentLoadInfos.containsKey(segment.getId())) { - return Response.ok(ImmutableMap.of("loaded", false)).build(); + for (DataSegment segment : segments.get()) { + for (DruidServer druidServer : serverInventoryView.getInventory()) { + String tier = druidServer.getTier(); + SegmentId segmentId = segment.getId(); + if (druidServer.getDataSource(dataSourceName).getSegment(segmentId) != null) { + Integer numReplicants = segmentsInCluster.get(segmentId, tier); + if (numReplicants == null) { + numReplicants = 0; + } + segmentsInCluster.put(segmentId, tier, numReplicants + 1); + } + } } + segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + dataSourceName, + theInterval, + false + ); + if (!segments.isPresent()) { + return logAndCreateDataSourceNotFoundResponse(dataSourceName); + } + for (DataSegment segment : segments.get()) { + for (final Rule rule : rules) { + if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) { + continue; + } + ((LoadRule) rule) + .getTieredReplicants() + .forEach((final String tier, final Integer ruleReplicants) -> { + Integer currentReplicantsRetVal = segmentsInCluster.get(segment.getId(), tier); + int currentReplicants = currentReplicantsRetVal == null ? 0 : currentReplicantsRetVal; + Object2LongMap underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier + .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()); + ((Object2LongOpenHashMap) underReplicationPerDataSource) + .addTo(dataSourceName, Math.max(ruleReplicants - currentReplicants, 0)); + }); + break; // only the first matching rule applies + } + } + return Response.ok(underReplicationCountsPerDataSourcePerTier).build(); + } else { + // Calculate resposne for default mode + int numUsedSegments = 0; + int numUnloadedSegments = 0; + for (DataSegment segment : segments.get()) { + numUsedSegments++; + if (!segmentLoadInfos.containsKey(segment.getId())) { + numUnloadedSegments++; + } + } + return Response.ok( + ImmutableMap.of( + dataSourceName, + 100 * ((double) (numUsedSegments - numUnloadedSegments) / (double) numUsedSegments) + ) + ).build(); } - return Response.ok(ImmutableMap.of("loaded", true)).build(); } + /** * The property names belong to the public HTTP JSON API. */ From 49dc85b69325ccfe7515ad5a4870aa40567c292d Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 9 Jun 2020 16:05:33 -1000 Subject: [PATCH 08/22] API to verify a datasource has the latest ingested data --- .../apache/druid/metadata/SegmentsMetadataManager.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 222f765f286a..6e640a097990 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -122,9 +122,11 @@ int markAsUsedNonOvershadowedSegments(String dataSource, Set segmentIds) * If {@param requiresLatest} is false then segment information from stale snapshot of up to the last periodic poll * period {@link SqlSegmentsMetadataManager#periodicPollDelay} will be used. */ - Optional> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource, - Interval interval, - boolean requiresLatest); + Optional> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + String datasource, + Interval interval, + boolean requiresLatest + ); /** * Retrieves all data source names for which there are segment in the database, regardless of whether those segments From 46e5019df2fdf9dda0e5a3475fd6e877d5c23cf9 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 9 Jun 2020 22:07:46 -1000 Subject: [PATCH 09/22] API to verify a datasource has the latest ingested data --- docs/operations/api-reference.md | 35 +++ .../metadata/SqlSegmentsMetadataManager.java | 26 +- .../server/http/DataSourcesResource.java | 18 +- .../SqlSegmentsMetadataManagerTest.java | 159 +++++++++++- .../server/http/DataSourcesResourceTest.java | 234 ++++++++++++++++++ 5 files changed, 446 insertions(+), 26 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index a66dd649f410..10ddec9ae237 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo Returns the serialized JSON of segments to load and drop for each Historical process. + +#### Segment Loading for Datasource + +Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` +(e.g., 2016-06-27_2016-06-28). + +These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query. +An example workflow for this is: +1. Submit your ingestion task +2. Repeatedly poll Overlod's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded. +3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with forceMetadataRefresh=true once. +If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. +4. Repeatedly poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with forceMetadataRefresh=false. +Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. + +##### GET + +* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}` + +Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource +over the given interval (or last 2 weeks if interval is not given). Setting forceMetadataRefresh to true +will force the coordinator to poll latest segment metadata from the metadatastore. forceMetadataRefresh will be set to true if not given. + + * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}` + +Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource +over the given interval (or last 2 weeks if interval is not given). This does not include replication. Setting forceMetadataRefresh to true +will force the coordinator to poll latest segment metadata from the metadatastore. forceMetadataRefresh will be set to true if not given. + +* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}` + +Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource +over the given interval (or last 2 weeks if interval is not given). This includes replication. Setting forceMetadataRefresh to true +will force the coordinator to poll latest segment metadata from the metadatastore. forceMetadataRefresh will be set to true if not given. + #### Metadata store information ##### GET diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 7d554971bd43..2f4dd3cc27a1 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -20,6 +20,7 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; @@ -97,7 +98,8 @@ private interface DatabasePoll {} /** Represents periodic {@link #poll}s happening from {@link #exec}. */ - private static class PeriodicDatabasePoll implements DatabasePoll + @VisibleForTesting + static class PeriodicDatabasePoll implements DatabasePoll { /** * This future allows to wait until {@link #dataSourcesSnapshot} is initialized in the first {@link #poll()} @@ -113,7 +115,8 @@ private static class PeriodicDatabasePoll implements DatabasePoll * Represents on-demand {@link #poll} initiated at periods of time when SqlSegmentsMetadataManager doesn't poll the database * periodically. */ - private static class OnDemandDatabasePoll implements DatabasePoll + @VisibleForTesting + static class OnDemandDatabasePoll implements DatabasePoll { final long initiationTimeNanos = System.nanoTime(); final CompletableFuture pollCompletionFuture = new CompletableFuture<>(); @@ -433,7 +436,8 @@ private void useLatestIfWithinDelayOrPerformNewDatabasePoll() * This means that any method using this check can read from snapshot that is * up to {@link SqlSegmentsMetadataManager#periodicPollDelay} old. */ - private boolean useLatestSnapshotIfWithinDelay() + @VisibleForTesting + boolean useLatestSnapshotIfWithinDelay() { DatabasePoll latestDatabasePoll = this.latestDatabasePoll; if (latestDatabasePoll instanceof PeriodicDatabasePoll) { @@ -462,7 +466,8 @@ private boolean useLatestSnapshotIfWithinDelay() * This means that any method using this check can be sure that the latest poll for the snapshot was completed after * this method was called. */ - public void forceOrWaitOngoingDatabasePoll() + @VisibleForTesting + void forceOrWaitOngoingDatabasePoll() { long checkStartTime = System.currentTimeMillis(); ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock(); @@ -929,6 +934,19 @@ public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments() return dataSourcesSnapshot; } + @VisibleForTesting + DataSourcesSnapshot getDataSourcesSnapshot() + { + return dataSourcesSnapshot; + } + + @VisibleForTesting + DatabasePoll getLatestDatabasePoll() + { + return latestDatabasePoll; + } + + @Override public Iterable iterateAllUsedSegments() { diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 6ca07b372ba7..6ef841595010 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -410,7 +410,7 @@ public Response getDatasourceLoadstatus( { final Interval theInterval; if (interval == null) { - long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000; + long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000; long currentTimeInMs = System.currentTimeMillis(); theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs); } else { @@ -428,10 +428,10 @@ public Response getDatasourceLoadstatus( if (!segments.isPresent()) { return logAndCreateDataSourceNotFoundResponse(dataSourceName); } - Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); if (simple != null) { // Calculate resposne for simple mode + Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); int numUnloadedSegments = 0; for (DataSegment segment : segments.get()) { if (!segmentLoadInfos.containsKey(segment.getId())) { @@ -448,14 +448,15 @@ public Response getDatasourceLoadstatus( // Calculate resposne for full mode final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); final List rules = metadataRuleManager.getRulesWithDefault(dataSourceName); - final Table segmentsInCluster = HashBasedTable.create();; + final Table segmentsInCluster = HashBasedTable.create(); final DateTime now = DateTimes.nowUtc(); for (DataSegment segment : segments.get()) { for (DruidServer druidServer : serverInventoryView.getInventory()) { String tier = druidServer.getTier(); SegmentId segmentId = segment.getId(); - if (druidServer.getDataSource(dataSourceName).getSegment(segmentId) != null) { + DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName); + if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) { Integer numReplicants = segmentsInCluster.get(segmentId, tier); if (numReplicants == null) { numReplicants = 0; @@ -464,14 +465,6 @@ public Response getDatasourceLoadstatus( } } } - segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( - dataSourceName, - theInterval, - false - ); - if (!segments.isPresent()) { - return logAndCreateDataSourceNotFoundResponse(dataSourceName); - } for (DataSegment segment : segments.get()) { for (final Rule rule : rules) { if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) { @@ -493,6 +486,7 @@ public Response getDatasourceLoadstatus( return Response.ok(underReplicationCountsPerDataSourcePerTier).build(); } else { // Calculate resposne for default mode + Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); int numUsedSegments = 0; int numUnloadedSegments = 0; for (DataSegment segment : segments.get()) { diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index be6354a63c75..57df44073460 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -20,11 +20,13 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -43,6 +45,7 @@ import org.junit.Test; import java.io.IOException; +import java.util.Set; import java.util.stream.Collectors; @@ -117,7 +120,7 @@ public void setUp() throws Exception { TestDerbyConnector connector = derbyConnectorRule.getConnector(); SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(); - config.setPollDuration(Period.seconds(1)); + config.setPollDuration(Period.seconds(3)); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), @@ -148,30 +151,124 @@ public void teardown() } @Test - public void testPoll() + public void testPollPeriodically() { + DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertNull(dataSourcesSnapshot); sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); - sqlSegmentsMetadataManager.poll(); Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + // This call make sure that the first poll is completed + sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay(); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertEquals( ImmutableSet.of("wikipedia"), sqlSegmentsMetadataManager.retrieveAllDataSourceNames() ); Assert.assertEquals( ImmutableList.of("wikipedia"), - sqlSegmentsMetadataManager - .getImmutableDataSourcesWithAllUsedSegments() - .stream() - .map(ImmutableDruidDataSource::getName) - .collect(Collectors.toList()) + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) ); Assert.assertEquals( ImmutableSet.of(segment1, segment2), - ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia").getSegments()) + ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments()) ); Assert.assertEquals( ImmutableSet.of(segment1, segment2), - ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot()) + ); + } + + @Test + public void testPollOnDemand() + { + DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertNull(dataSourcesSnapshot); + // This should return false and not wait/poll anything as we did not schedule periodic poll + Assert.assertFalse(sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay()); + Assert.assertNull(dataSourcesSnapshot); + // This call will force on demand poll + sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll(); + Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableSet.of("wikipedia"), + sqlSegmentsMetadataManager.retrieveAllDataSourceNames() + ); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) + ); + Assert.assertEquals( + ImmutableSet.of(segment1, segment2), + ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments()) + ); + Assert.assertEquals( + ImmutableSet.of(segment1, segment2), + ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot()) + ); + } + + @Test(timeout = 60_000) + public void testPollPeriodicallyAndOnDemandInterleave() throws Exception + { + DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertNull(dataSourcesSnapshot); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + // This call make sure that the first poll is completed + sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay(); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) + ); + final String newDataSource2 = "wikipedia2"; + final DataSegment newSegment2 = createNewSegment1(newDataSource2); + publisher.publishSegment(newSegment2); + + // This call will force on demand poll + sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll); + // New datasource should now be in the snapshot since we just force on demand poll. + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableList.of("wikipedia2", "wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) + ); + + final String newDataSource3 = "wikipedia3"; + final DataSegment newSegment3 = createNewSegment1(newDataSource3); + publisher.publishSegment(newSegment3); + + // This time wait for periodic poll (not doing on demand poll so we have to wait a bit...) + while (sqlSegmentsMetadataManager.getDataSourcesSnapshot().getDataSource(newDataSource3) == null) { + Thread.sleep(1000); + } + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); + dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); + Assert.assertEquals( + ImmutableList.of("wikipedia2", "wikipedia3", "wikipedia"), + dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() + .stream() + .map(ImmutableDruidDataSource::getName) + .collect(Collectors.toList()) ); } @@ -749,4 +846,46 @@ public void testStopAndStart() sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); sqlSegmentsMetadataManager.stopPollingDatabasePeriodically(); } + + @Test + public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval() throws Exception + { + final Interval theInterval = Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000"); + Optional> segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + "wikipedia", theInterval, true + ); + Assert.assertTrue(segments.isPresent()); + Set dataSegmentSet = ImmutableSet.copyOf(segments.get()); + Assert.assertEquals(1, dataSegmentSet.size()); + Assert.assertTrue(dataSegmentSet.contains(segment1)); + + final DataSegment newSegment2 = createSegment( + "wikipedia", + "2012-03-16T00:00:00.000/2012-03-17T00:00:00.000", + "2017-10-15T20:19:12.565Z", + "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip", + 0 + ); + publisher.publishSegment(newSegment2); + + // New segment is not returned since we call without force poll + segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + "wikipedia", theInterval, false + ); + Assert.assertTrue(segments.isPresent()); + dataSegmentSet = ImmutableSet.copyOf(segments.get()); + Assert.assertEquals(1, dataSegmentSet.size()); + Assert.assertTrue(dataSegmentSet.contains(segment1)); + + // New segment is returned since we call with force poll + segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( + "wikipedia", theInterval, true + ); + Assert.assertTrue(segments.isPresent()); + dataSegmentSet = ImmutableSet.copyOf(segments.get()); + Assert.assertEquals(2, dataSegmentSet.size()); + Assert.assertTrue(dataSegmentSet.contains(segment1)); + Assert.assertTrue(dataSegmentSet.contains(newSegment2)); + } + } diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 10a7f97300c9..9fcfd05825f3 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -19,6 +19,7 @@ package org.apache.druid.server.http; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -1193,6 +1194,239 @@ public void testMarkSegmentsAsUnusedInvalidPayloadBothArguments() Assert.assertNotNull(response.getEntity()); } + @Test + public void testGetDatasourceLoadstatusDefault() + { + DataSegment datasource1Segment1 = new DataSegment( + "datasource1", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 10 + ); + + DataSegment datasource1Segment2 = new DataSegment( + "datasource1", + Intervals.of("2010-01-22/P1D"), + "", + null, + null, + null, + null, + 0x9, + 20 + ); + DataSegment datasource2Segment1 = new DataSegment( + "datasource2", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 30 + ); + List segments = ImmutableList.of(datasource1Segment1, datasource1Segment2); + Map completedLoadInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1), + datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2), + datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1) + ); + Map halfLoadedInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1) + ); + + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(100.0, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + EasyMock.reset(segmentsMetadataManager, inventoryView); + + // Test when datasource half loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(50.0, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + } + + @Test + public void testGetDatasourceLoadstatusSimple() + { + DataSegment datasource1Segment1 = new DataSegment( + "datasource1", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 10 + ); + + DataSegment datasource1Segment2 = new DataSegment( + "datasource1", + Intervals.of("2010-01-22/P1D"), + "", + null, + null, + null, + null, + 0x9, + 20 + ); + DataSegment datasource2Segment1 = new DataSegment( + "datasource2", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 30 + ); + List segments = ImmutableList.of(datasource1Segment1, datasource1Segment2); + Map completedLoadInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1), + datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2), + datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1) + ); + Map halfLoadedInfoMap = ImmutableMap.of( + datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1) + ); + + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, "simple", null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(0, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + EasyMock.reset(segmentsMetadataManager, inventoryView); + + // Test when datasource half loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView); + + dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, "simple", null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(1, ((Map) response.getEntity()).size()); + Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1")); + Assert.assertEquals(1, ((Map) response.getEntity()).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView); + } + + @Test + public void testGetDatasourceLoadstatusFull() + { + DataSegment datasource1Segment1 = new DataSegment( + "datasource1", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 10 + ); + + DataSegment datasource1Segment2 = new DataSegment( + "datasource1", + Intervals.of("2010-01-22/P1D"), + "", + null, + null, + null, + null, + 0x9, + 20 + ); + DataSegment datasource2Segment1 = new DataSegment( + "datasource2", + Intervals.of("2010-01-01/P1D"), + "", + null, + null, + null, + null, + 0x9, + 30 + ); + DruidServer server1 = new DruidServer("server1", "host1", null, 1234, ServerType.HISTORICAL, "tier1", 0); + server1.addDataSegment(datasource1Segment1); + server1.addDataSegment(datasource1Segment2); + server1.addDataSegment(datasource2Segment1); + DruidServer server2 = new DruidServer("server2", "host2", null, 1234, ServerType.HISTORICAL, "tier2", 0); + server2.addDataSegment(datasource1Segment2); + server2.addDataSegment(datasource2Segment1); + DruidServer server3 = new DruidServer("server3", "host3", null, 1234, ServerType.HISTORICAL, "tier1", 0); + server3.addDataSegment(datasource1Segment1); + server3.addDataSegment(datasource1Segment2); + server3.addDataSegment(datasource2Segment1); + + List segments = ImmutableList.of(datasource1Segment1, datasource1Segment2); + + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server1, server2, server3)).times(2); + MetadataRuleManager databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); + Rule loadRule = new IntervalLoadRule(Intervals.of("2010-01-01T00:00:00Z/2012-01-03T00:00:00Z"), ImmutableMap.of("tier1", 2, "tier2", 2)); + Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z")); + EasyMock.expect(databaseRuleManager.getRulesWithDefault("datasource1")) + .andReturn(ImmutableList.of(loadRule, dropRule)) + .once(); + EasyMock.replay(segmentsMetadataManager, inventoryView, databaseRuleManager); + + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, databaseRuleManager, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, "full"); + Assert.assertEquals(200, response.getStatus()); + Assert.assertNotNull(response.getEntity()); + Assert.assertEquals(2, ((Map) response.getEntity()).size()); + Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier1")).size()); + Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier2")).size()); + Assert.assertEquals(0L, ((Map) ((Map) response.getEntity()).get("tier1")).get("datasource1")); + Assert.assertEquals(3L, ((Map) ((Map) response.getEntity()).get("tier2")).get("datasource1")); + EasyMock.verify(segmentsMetadataManager, inventoryView, databaseRuleManager); + } + private DruidServerMetadata createRealtimeServerMetadata(String name) { return createServerMetadata(name, ServerType.REALTIME); From 2270a5bec6985111be1d464b96bf23f7962a0d58 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 9 Jun 2020 22:43:15 -1000 Subject: [PATCH 10/22] API to verify a datasource has the latest ingested data --- docs/operations/api-reference.md | 6 +++--- .../apache/druid/server/http/DataSourcesResource.java | 10 +++++++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 10ddec9ae237..9c1a63e73690 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -117,9 +117,6 @@ Returns the serialized JSON of segments to load and drop for each Historical pro #### Segment Loading for Datasource -Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` -(e.g., 2016-06-27_2016-06-28). - These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query. An example workflow for this is: 1. Submit your ingestion task @@ -136,18 +133,21 @@ Continue polling until all segments are loaded. Once all segments are loaded you Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource over the given interval (or last 2 weeks if interval is not given). Setting forceMetadataRefresh to true will force the coordinator to poll latest segment metadata from the metadatastore. forceMetadataRefresh will be set to true if not given. +If no used segments found for the given inputs, this API returns 100% as the value. * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}` Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource over the given interval (or last 2 weeks if interval is not given). This does not include replication. Setting forceMetadataRefresh to true will force the coordinator to poll latest segment metadata from the metadatastore. forceMetadataRefresh will be set to true if not given. +If no used segments found for the given inputs, this API returns 0 as the value. * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}` Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource over the given interval (or last 2 weeks if interval is not given). This includes replication. Setting forceMetadataRefresh to true will force the coordinator to poll latest segment metadata from the metadatastore. forceMetadataRefresh will be set to true if not given. +If no used segments found for the given inputs, this API returns empty map. #### Metadata store information diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 6ef841595010..c89ab6d2d0d9 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -414,7 +414,7 @@ public Response getDatasourceLoadstatus( long currentTimeInMs = System.currentTimeMillis(); theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs); } else { - theInterval = Intervals.of(interval); + theInterval = Intervals.of(interval.replace('_', '/')); } boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh; @@ -495,6 +495,14 @@ public Response getDatasourceLoadstatus( numUnloadedSegments++; } } + if (numUsedSegments == 0) { + return Response.ok( + ImmutableMap.of( + dataSourceName, + 100 + ) + ).build(); + } return Response.ok( ImmutableMap.of( dataSourceName, From 172ef6ac47e1566ad889d937bd8f12ad170bf4ff Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Wed, 10 Jun 2020 02:19:34 -1000 Subject: [PATCH 11/22] fix spelling --- docs/operations/api-reference.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 9c1a63e73690..6250a845eff1 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -120,10 +120,10 @@ Returns the serialized JSON of segments to load and drop for each Historical pro These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query. An example workflow for this is: 1. Submit your ingestion task -2. Repeatedly poll Overlod's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded. -3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with forceMetadataRefresh=true once. +2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded. +3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. -4. Repeatedly poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with forceMetadataRefresh=false. +4. Repeatedly poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. ##### GET @@ -131,22 +131,22 @@ Continue polling until all segments are loaded. Once all segments are loaded you * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}` Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource -over the given interval (or last 2 weeks if interval is not given). Setting forceMetadataRefresh to true -will force the coordinator to poll latest segment metadata from the metadatastore. forceMetadataRefresh will be set to true if not given. +over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh=true` +will force the coordinator to poll latest segment metadata from the metadata store. `forceMetadataRefresh` will be set to true if not given. If no used segments found for the given inputs, this API returns 100% as the value. * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}` Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource -over the given interval (or last 2 weeks if interval is not given). This does not include replication. Setting forceMetadataRefresh to true -will force the coordinator to poll latest segment metadata from the metadatastore. forceMetadataRefresh will be set to true if not given. +over the given interval (or last 2 weeks if interval is not given). This does not include replication. Setting `forceMetadataRefresh=true` +will force the coordinator to poll latest segment metadata from the metadata store. `forceMetadataRefresh` will be set to true if not given. If no used segments found for the given inputs, this API returns 0 as the value. * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}` Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource -over the given interval (or last 2 weeks if interval is not given). This includes replication. Setting forceMetadataRefresh to true -will force the coordinator to poll latest segment metadata from the metadatastore. forceMetadataRefresh will be set to true if not given. +over the given interval (or last 2 weeks if interval is not given). This includes replication. Setting `forceMetadataRefresh=true` +will force the coordinator to poll latest segment metadata from the metadata store. `forceMetadataRefresh` will be set to true if not given. If no used segments found for the given inputs, this API returns empty map. #### Metadata store information From 6d4db3164bbf3aa06f511426b6208f8273901e0f Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 12 Jun 2020 18:21:49 -1000 Subject: [PATCH 12/22] address comments --- docs/operations/api-reference.md | 32 ++-- .../metadata/SegmentsMetadataManager.java | 4 +- .../server/coordinator/DruidCoordinator.java | 16 +- .../server/http/DataSourcesResource.java | 155 ++++++++++-------- 4 files changed, 113 insertions(+), 94 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 6250a845eff1..cc5a98fc33be 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -96,11 +96,11 @@ Returns the percentage of segments actually loaded in the cluster versus segment * `/druid/coordinator/v1/loadstatus?simple` -Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replication. +Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replica of segments. * `/druid/coordinator/v1/loadstatus?full` -Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes replication. +Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This include replica of segments. * `/druid/coordinator/v1/loadqueue` @@ -115,15 +115,15 @@ Returns the number of segments to load and drop, as well as the total segment lo Returns the serialized JSON of segments to load and drop for each Historical process. -#### Segment Loading for Datasource +#### Segment Loading by Datasource -These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query. +You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following APIs. An example workflow for this is: -1. Submit your ingestion task -2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded. -3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. +1. Submit your ingestion task. +2. Repeatedly poll the Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. +3. Poll the datasource loadstatus API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. -4. Repeatedly poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. +4. Repeatedly poll the datasource loadstatus API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. ##### GET @@ -131,23 +131,21 @@ Continue polling until all segments are loaded. Once all segments are loaded you * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}` Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource -over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh=true` -will force the coordinator to poll latest segment metadata from the metadata store. `forceMetadataRefresh` will be set to true if not given. -If no used segments found for the given inputs, this API returns 100% as the value. +over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh` to true (the default) +will force the coordinator to poll latest segment metadata from the metadata store. If all segments have been loaded or +no used segments are found for the given inputs, this API returns 100% as the value. * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}` Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource -over the given interval (or last 2 weeks if interval is not given). This does not include replication. Setting `forceMetadataRefresh=true` -will force the coordinator to poll latest segment metadata from the metadata store. `forceMetadataRefresh` will be set to true if not given. -If no used segments found for the given inputs, this API returns 0 as the value. +over the given interval (or last 2 weeks if interval is not given). This does not include replica of segments. Setting `forceMetadataRefresh` to true (the default) +will force the coordinator to poll latest segment metadata from the metadata store. If no used segments found for the given inputs, this API returns 0 as the value. * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}` Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource -over the given interval (or last 2 weeks if interval is not given). This includes replication. Setting `forceMetadataRefresh=true` -will force the coordinator to poll latest segment metadata from the metadata store. `forceMetadataRefresh` will be set to true if not given. -If no used segments found for the given inputs, this API returns empty map. +over the given interval (or last 2 weeks if interval is not given). This include replica of segments. Setting `forceMetadataRefresh` to true (the default) +will force the coordinator to poll latest segment metadata from the metadata store. If no used segments found for the given inputs, this API returns empty map. #### Metadata store information diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 6e640a097990..889141a89c15 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -116,7 +116,9 @@ int markAsUsedNonOvershadowedSegments(String dataSource, Set segmentIds) /** * Returns an iterable to go over all used and non-overshadowed segments of given data sources over given interval. - * The order in which segments are iterated is unspecified. + * The order in which segments are iterated is unspecified. Note: the iteration may not be as trivially cheap as, + * for example, iteration over an ArrayList. Try (to some reasonable extent) to organize the code so that it + * iterates the returned iterable only once rather than several times. * If {@param requiresLatest} is true then a force metadatastore poll will be triggered. This can cause a longer * response time but will ensure that the latest segment information (at the time this method is called) is returned. * If {@param requiresLatest} is false then segment information from stale snapshot of up to the last periodic poll diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index f8c3f43c76f5..36193d3cbe4b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -256,6 +256,14 @@ public Map getLoadManagementPeons() * @return tier -> { dataSource -> underReplicationCount } map */ public Map> computeUnderReplicationCountsPerDataSourcePerTier() + { + final Iterable dataSegments = segmentsMetadataManager.iterateAllUsedSegments(); + return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments); + } + + public Map> computeUnderReplicationCountsPerDataSourcePerTierForSegments( + Iterable dataSegments + ) { final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); @@ -263,8 +271,6 @@ public Map> computeUnderReplicationCountsPerDataS return underReplicationCountsPerDataSourcePerTier; } - final Iterable dataSegments = segmentsMetadataManager.iterateAllUsedSegments(); - final DateTime now = DateTimes.nowUtc(); for (final DataSegment segment : dataSegments) { @@ -320,7 +326,7 @@ public Map getLoadStatus() for (ImmutableDruidDataSource dataSource : dataSources) { final Set segments = Sets.newHashSet(dataSource.getSegments()); - final int numUsedSegments = segments.size(); + final int numPublishedSegments = segments.size(); // remove loaded segments for (DruidServer druidServer : serverInventoryView.getInventory()) { @@ -333,10 +339,10 @@ public Map getLoadStatus() } } } - final int numUnloadedSegments = segments.size(); + final int numUnavailableSegments = segments.size(); loadStatus.put( dataSource.getName(), - 100 * ((double) (numUsedSegments - numUnloadedSegments) / (double) numUsedSegments) + 100 * ((double) (numPublishedSegments - numUnavailableSegments) / (double) numPublishedSegments) ); } diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index c89ab6d2d0d9..06e6cf567515 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -32,6 +32,7 @@ import com.sun.jersey.spi.container.ResourceFilters; import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; +import org.apache.commons.collections4.IterableUtils; import org.apache.commons.lang.StringUtils; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.DruidDataSource; @@ -54,6 +55,8 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordinator.DruidCoordinator; +import org.apache.druid.server.coordinator.SegmentReplicantLookup; import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.http.security.DatasourceResourceFilter; @@ -107,6 +110,7 @@ public class DataSourcesResource private final MetadataRuleManager metadataRuleManager; private final IndexingServiceClient indexingServiceClient; private final AuthorizerMapper authorizerMapper; + private final DruidCoordinator coordinator; @Inject public DataSourcesResource( @@ -114,7 +118,8 @@ public DataSourcesResource( SegmentsMetadataManager segmentsMetadataManager, MetadataRuleManager metadataRuleManager, @Nullable IndexingServiceClient indexingServiceClient, - AuthorizerMapper authorizerMapper + AuthorizerMapper authorizerMapper, + DruidCoordinator coordinator ) { this.serverInventoryView = serverInventoryView; @@ -122,6 +127,7 @@ public DataSourcesResource( this.metadataRuleManager = metadataRuleManager; this.indexingServiceClient = indexingServiceClient; this.authorizerMapper = authorizerMapper; + this.coordinator = coordinator; } @GET @@ -402,12 +408,18 @@ public Response getServedSegmentsInInterval( @ResourceFilters(DatasourceResourceFilter.class) public Response getDatasourceLoadstatus( @PathParam("dataSourceName") String dataSourceName, + @QueryParam("forceMetadataRefresh") final Boolean forceMetadataRefresh, @QueryParam("interval") @Nullable final String interval, - @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh, @QueryParam("simple") @Nullable final String simple, @QueryParam("full") @Nullable final String full ) { + if (forceMetadataRefresh == null) { + return Response + .status(Response.Status.BAD_REQUEST) + .entity("Invalid request. forceMetadataRefresh must be specified") + .build(); + } final Interval theInterval; if (interval == null) { long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000; @@ -417,101 +429,102 @@ public Response getDatasourceLoadstatus( theInterval = Intervals.of(interval.replace('_', '/')); } - boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh; - Optional> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval( dataSourceName, theInterval, - requiresMetadataStorePoll + forceMetadataRefresh ); if (!segments.isPresent()) { return logAndCreateDataSourceNotFoundResponse(dataSourceName); } + if (IterableUtils.size(segments.get()) == 0) { + return Response + .status(Response.Status.NO_CONTENT) + .entity("No used segment found for the given datasource and interval") + .build(); + } + if (simple != null) { - // Calculate resposne for simple mode - Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); - int numUnloadedSegments = 0; - for (DataSegment segment : segments.get()) { - if (!segmentLoadInfos.containsKey(segment.getId())) { - numUnloadedSegments++; - } - } + // Calculate response for simple mode + SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments.get()); return Response.ok( ImmutableMap.of( dataSourceName, - numUnloadedSegments + segmentsLoadStatistics.getNumUnavailableSegments() ) ).build(); } else if (full != null) { - // Calculate resposne for full mode - final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); - final List rules = metadataRuleManager.getRulesWithDefault(dataSourceName); - final Table segmentsInCluster = HashBasedTable.create(); - final DateTime now = DateTimes.nowUtc(); - - for (DataSegment segment : segments.get()) { - for (DruidServer druidServer : serverInventoryView.getInventory()) { - String tier = druidServer.getTier(); - SegmentId segmentId = segment.getId(); - DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName); - if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) { - Integer numReplicants = segmentsInCluster.get(segmentId, tier); - if (numReplicants == null) { - numReplicants = 0; - } - segmentsInCluster.put(segmentId, tier, numReplicants + 1); - } - } + // Calculate response for full mode + Map> segmentLoadMap + = coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get()); + if (segmentLoadMap.isEmpty()) { + return Response.serverError() + .entity("Coordinator segment replicant lookup is not initialized yet. Try again later.") + .build(); } - for (DataSegment segment : segments.get()) { - for (final Rule rule : rules) { - if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) { - continue; - } - ((LoadRule) rule) - .getTieredReplicants() - .forEach((final String tier, final Integer ruleReplicants) -> { - Integer currentReplicantsRetVal = segmentsInCluster.get(segment.getId(), tier); - int currentReplicants = currentReplicantsRetVal == null ? 0 : currentReplicantsRetVal; - Object2LongMap underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier - .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()); - ((Object2LongOpenHashMap) underReplicationPerDataSource) - .addTo(dataSourceName, Math.max(ruleReplicants - currentReplicants, 0)); - }); - break; // only the first matching rule applies - } - } - return Response.ok(underReplicationCountsPerDataSourcePerTier).build(); + return Response.ok(segmentLoadMap).build(); } else { - // Calculate resposne for default mode - Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); - int numUsedSegments = 0; - int numUnloadedSegments = 0; - for (DataSegment segment : segments.get()) { - numUsedSegments++; - if (!segmentLoadInfos.containsKey(segment.getId())) { - numUnloadedSegments++; - } - } - if (numUsedSegments == 0) { - return Response.ok( - ImmutableMap.of( - dataSourceName, - 100 - ) - ).build(); - } + // Calculate response for default mode + SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments.get()); return Response.ok( ImmutableMap.of( dataSourceName, - 100 * ((double) (numUsedSegments - numUnloadedSegments) / (double) numUsedSegments) + 100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments()) / (double) segmentsLoadStatistics.getNumPublishedSegments()) ) ).build(); } } + private SegmentsLoadStatistics computeSegmentLoadStatistics(Iterable segments) { + Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); + int numPublishedSegments = 0; + int numUnavailableSegments = 0; + int numLoadedSegments = 0; + for (DataSegment segment : segments) { + numPublishedSegments++; + if (!segmentLoadInfos.containsKey(segment.getId())) { + numUnavailableSegments++; + } else { + numLoadedSegments++; + } + } + return new SegmentsLoadStatistics(numPublishedSegments, numUnavailableSegments, numLoadedSegments); + } + + private static class SegmentsLoadStatistics + { + private int numPublishedSegments; + private int numUnavailableSegments; + private int numLoadedSegments; + + SegmentsLoadStatistics( + int numPublishedSegments, + int numUnavailableSegments, + int numLoadedSegments + ) + { + this.numPublishedSegments = numPublishedSegments; + this.numUnavailableSegments = numUnavailableSegments; + this.numLoadedSegments = numLoadedSegments; + } + + public int getNumPublishedSegments() + { + return numPublishedSegments; + } + + public int getNumUnavailableSegments() + { + return numUnavailableSegments; + } + + public int getNumLoadedSegments() + { + return numLoadedSegments; + } + } /** * The property names belong to the public HTTP JSON API. From cb39dd2e4e5f3aa5589e67ff84edf10d0afda109 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 12 Jun 2020 18:28:53 -1000 Subject: [PATCH 13/22] fix checkstyle --- .../org/apache/druid/server/http/DataSourcesResource.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 06e6cf567515..3f6ff6196f8f 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -24,14 +24,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Throwables; -import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import com.google.common.collect.Table; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import it.unimi.dsi.fastutil.objects.Object2LongMap; -import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.commons.collections4.IterableUtils; import org.apache.commons.lang.StringUtils; import org.apache.druid.client.CoordinatorServerView; @@ -56,7 +53,6 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordinator.DruidCoordinator; -import org.apache.druid.server.coordinator.SegmentReplicantLookup; import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.http.security.DatasourceResourceFilter; @@ -477,7 +473,8 @@ public Response getDatasourceLoadstatus( } } - private SegmentsLoadStatistics computeSegmentLoadStatistics(Iterable segments) { + private SegmentsLoadStatistics computeSegmentLoadStatistics(Iterable segments) + { Map segmentLoadInfos = serverInventoryView.getSegmentLoadInfos(); int numPublishedSegments = 0; int numUnavailableSegments = 0; From 409a3926a21c8cbe8deb7a5c8b620d99d93cdfd1 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 12 Jun 2020 19:01:16 -1000 Subject: [PATCH 14/22] update docs --- docs/operations/api-reference.md | 25 +++--- .../server/http/DataSourcesResourceTest.java | 76 +++++++++---------- 2 files changed, 53 insertions(+), 48 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index cc5a98fc33be..f300f98d7e90 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -121,31 +121,36 @@ You can verify if segments created by a recent ingestion task are loaded onto hi An example workflow for this is: 1. Submit your ingestion task. 2. Repeatedly poll the Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. -3. Poll the datasource loadstatus API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. +3. Poll the below datasource loadstatus API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. -4. Repeatedly poll the datasource loadstatus API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. +4. Repeatedly poll the below datasource loadstatus API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. ##### GET * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}` -Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource -over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh` to true (the default) -will force the coordinator to poll latest segment metadata from the metadata store. If all segments have been loaded or -no used segments are found for the given inputs, this API returns 100% as the value. +Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given +datasource over the given interval (or last 2 weeks if interval is not given). `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store. +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +If no used segments are found for the given inputs, this API returns `204 No Content` * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}` Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource -over the given interval (or last 2 weeks if interval is not given). This does not include replica of segments. Setting `forceMetadataRefresh` to true (the default) -will force the coordinator to poll latest segment metadata from the metadata store. If no used segments found for the given inputs, this API returns 0 as the value. +over the given interval (or last 2 weeks if interval is not given). This does not include replica of segments. `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store. +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +If no used segments are found for the given inputs, this API returns `204 No Content` * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}` Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource -over the given interval (or last 2 weeks if interval is not given). This include replica of segments. Setting `forceMetadataRefresh` to true (the default) -will force the coordinator to poll latest segment metadata from the metadata store. If no used segments found for the given inputs, this API returns empty map. +over the given interval (or last 2 weeks if interval is not given). This include replica of segments. `forceMetadataRefresh` is required to be set. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store. +Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. +If no used segments are found for the given inputs, this API returns `204 No Content` #### Metadata store information diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index 9fcfd05825f3..eb47c55c8236 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -177,7 +177,7 @@ public void testGetFullQueryableDataSources() EasyMock.replay(inventoryView, server, request); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER); + new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null); Response response = dataSourcesResource.getQueryableDataSources("full", null, request); Set result = (Set) response.getEntity(); Assert.assertEquals(200, response.getStatus()); @@ -251,7 +251,7 @@ public Access authorize(AuthenticationResult authenticationResult1, Resource res } }; - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, authMapper); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, authMapper, null); Response response = dataSourcesResource.getQueryableDataSources("full", null, request); Set result = (Set) response.getEntity(); @@ -290,7 +290,7 @@ public void testGetSimpleQueryableDataSources() EasyMock.replay(inventoryView, server, request); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER); + new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null); Response response = dataSourcesResource.getQueryableDataSources(null, "simple", request); Assert.assertEquals(200, response.getStatus()); List> results = (List>) response.getEntity(); @@ -314,7 +314,7 @@ public void testFullGetTheDataSource() EasyMock.replay(inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", "full"); ImmutableDruidDataSource result = (ImmutableDruidDataSource) response.getEntity(); Assert.assertEquals(200, response.getStatus()); @@ -330,7 +330,7 @@ public void testNullGetTheDataSource() EasyMock.replay(inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Assert.assertEquals(204, dataSourcesResource.getDataSource("none", null).getStatus()); EasyMock.verify(inventoryView, server); } @@ -348,7 +348,7 @@ public void testSimpleGetTheDataSource() EasyMock.replay(inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); Map> result = (Map>) response.getEntity(); @@ -381,7 +381,7 @@ public void testSimpleGetTheDataSourceManyTiers() EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server, server2, server3)).atLeastOnce(); EasyMock.replay(inventoryView, server, server2, server3); - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); Map> result = (Map>) response.getEntity(); @@ -419,7 +419,7 @@ public void testSimpleGetTheDataSourceWithReplicatedSegments() EasyMock.replay(inventoryView); - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); Map> result1 = (Map>) response.getEntity(); @@ -464,7 +464,7 @@ public void testGetSegmentDataSourceIntervals() expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z")); expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z")); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getIntervalsWithServedSegmentsOrAllServedSegmentsPerIntervals( "invalidDataSource", @@ -524,7 +524,7 @@ public void testGetServedSegmentsInIntervalInDataSource() EasyMock.replay(inventoryView); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, null, null); + new DataSourcesResource(inventoryView, null, null, null, null, null); Response response = dataSourcesResource.getServedSegmentsInInterval( "invalidDataSource", "2010-01-01/P1D", @@ -594,7 +594,7 @@ public void testKillSegmentsInIntervalInDataSource() EasyMock.replay(indexingServiceClient, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null); + new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null, null); Response response = dataSourcesResource.killUnusedSegmentsInInterval("datasource1", interval); Assert.assertEquals(200, response.getStatus()); @@ -608,7 +608,7 @@ public void testMarkAsUnusedAllSegmentsInDataSource() IndexingServiceClient indexingServiceClient = EasyMock.createStrictMock(IndexingServiceClient.class); EasyMock.replay(indexingServiceClient, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null); + new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null, null); try { Response response = dataSourcesResource.markAsUnusedAllSegmentsOrKillUnusedSegmentsInInterval("datasource", "true", "???"); @@ -631,7 +631,7 @@ public void testIsHandOffComplete() Rule loadRule = new IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), null); Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z")); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, null, databaseRuleManager, null, null); + new DataSourcesResource(inventoryView, null, databaseRuleManager, null, null, null); // test dropped EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1")) @@ -700,7 +700,7 @@ public void testMarkSegmentAsUsed() EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(true).once(); EasyMock.replay(segmentsMetadataManager); - DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), segment.getId().toString()); Assert.assertEquals(200, response.getStatus()); @@ -714,7 +714,7 @@ public void testMarkSegmentAsUsedNoChange() EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(false).once(); EasyMock.replay(segmentsMetadataManager); - DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), segment.getId().toString()); Assert.assertEquals(200, response.getStatus()); @@ -735,7 +735,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsInterval() EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -758,7 +758,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsIntervalNoneUpdated() EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -781,7 +781,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsSet() throws UnknownSegmentIdsE EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -804,7 +804,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsIntervalException() EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -822,7 +822,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsNoDataSource() EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -836,7 +836,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsNoDataSource() public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadNoArguments() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -849,7 +849,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadNoArguments() public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadBothArguments() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -862,7 +862,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadBothArguments() public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadEmptyArray() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", @@ -875,7 +875,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadEmptyArray() public void testMarkAsUsedNonOvershadowedSegmentsNoPayload() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments("datasource1", null); Assert.assertEquals(400, response.getStatus()); @@ -1027,7 +1027,7 @@ public void testMarkSegmentsAsUnused() new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), response.getEntity()); @@ -1050,7 +1050,7 @@ public void testMarkSegmentsAsUnusedNoChanges() new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); @@ -1075,7 +1075,7 @@ public void testMarkSegmentsAsUnusedException() new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(500, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1097,7 +1097,7 @@ public void testMarkAsUnusedSegmentsInInterval() new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), response.getEntity()); @@ -1120,7 +1120,7 @@ public void testMarkAsUnusedSegmentsInIntervalNoChanges() new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); @@ -1144,7 +1144,7 @@ public void testMarkAsUnusedSegmentsInIntervalException() new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload); Assert.assertEquals(500, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1155,7 +1155,7 @@ public void testMarkAsUnusedSegmentsInIntervalException() public void testMarkSegmentsAsUnusedNullPayload() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", null); Assert.assertEquals(400, response.getStatus()); @@ -1170,7 +1170,7 @@ public void testMarkSegmentsAsUnusedNullPayload() public void testMarkSegmentsAsUnusedInvalidPayload() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); final DataSourcesResource.MarkDataSourceSegmentsPayload payload = new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null); @@ -1184,7 +1184,7 @@ public void testMarkSegmentsAsUnusedInvalidPayload() public void testMarkSegmentsAsUnusedInvalidPayloadBothArguments() { DataSourcesResource dataSourcesResource = - new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); final DataSourcesResource.MarkDataSourceSegmentsPayload payload = new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-01/P1D"), ImmutableSet.of()); @@ -1247,7 +1247,7 @@ public void testGetDatasourceLoadstatusDefault() EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once(); EasyMock.replay(segmentsMetadataManager, inventoryView); - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1263,7 +1263,7 @@ public void testGetDatasourceLoadstatusDefault() EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once(); EasyMock.replay(segmentsMetadataManager, inventoryView); - dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1326,7 +1326,7 @@ public void testGetDatasourceLoadstatusSimple() EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once(); EasyMock.replay(segmentsMetadataManager, inventoryView); - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, "simple", null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1342,7 +1342,7 @@ public void testGetDatasourceLoadstatusSimple() EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once(); EasyMock.replay(segmentsMetadataManager, inventoryView); - dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null); + dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, "simple", null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1415,7 +1415,7 @@ public void testGetDatasourceLoadstatusFull() .once(); EasyMock.replay(segmentsMetadataManager, inventoryView, databaseRuleManager); - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, databaseRuleManager, null, null); + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, databaseRuleManager, null, null, null); Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, "full"); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); From bb803cd946dc94d0a8a362c84bf42ce66f3ba9b9 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Fri, 12 Jun 2020 19:48:30 -1000 Subject: [PATCH 15/22] fix tests --- .../server/coordinator/DruidCoordinator.java | 3 + .../server/http/DataSourcesResourceTest.java | 91 +++++++++++-------- 2 files changed, 56 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 36193d3cbe4b..7ef2f66f2339 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -261,6 +261,9 @@ public Map> computeUnderReplicationCountsPerDataS return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments); } + /** + * @return tier -> { dataSource -> underReplicationCount } map + */ public Map> computeUnderReplicationCountsPerDataSourcePerTierForSegments( Iterable dataSegments ) diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index eb47c55c8236..39e02ae86def 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; @@ -40,6 +42,7 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.rules.IntervalDropRule; import org.apache.druid.server.coordinator.rules.IntervalLoadRule; import org.apache.druid.server.coordinator.rules.Rule; @@ -1194,6 +1197,36 @@ public void testMarkSegmentsAsUnusedInvalidPayloadBothArguments() Assert.assertNotNull(response.getEntity()); } + @Test + public void testGetDatasourceLoadstatusForceMetadataRefreshNull() + { + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testGetDatasourceLoadstatusNoSegmentForInterval() + { + List segments = ImmutableList.of(); + // Test when datasource fully loaded + EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq( + "datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) + .andReturn(Optional.of(segments)).once(); + EasyMock.replay(segmentsMetadataManager); + + DataSourcesResource dataSourcesResource = new DataSourcesResource( + inventoryView, + segmentsMetadataManager, + null, + null, + null, + null + ); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); + Assert.assertEquals(204, response.getStatus()); + } + @Test public void testGetDatasourceLoadstatusDefault() { @@ -1248,7 +1281,7 @@ public void testGetDatasourceLoadstatusDefault() EasyMock.replay(segmentsMetadataManager, inventoryView); DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); - Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1264,7 +1297,7 @@ public void testGetDatasourceLoadstatusDefault() EasyMock.replay(segmentsMetadataManager, inventoryView); dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); - response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null); + response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1327,7 +1360,7 @@ public void testGetDatasourceLoadstatusSimple() EasyMock.replay(segmentsMetadataManager, inventoryView); DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); - Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, "simple", null); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1343,7 +1376,7 @@ public void testGetDatasourceLoadstatusSimple() EasyMock.replay(segmentsMetadataManager, inventoryView); dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null); - response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, "simple", null); + response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(1, ((Map) response.getEntity()).size()); @@ -1378,45 +1411,27 @@ public void testGetDatasourceLoadstatusFull() 0x9, 20 ); - DataSegment datasource2Segment1 = new DataSegment( - "datasource2", - Intervals.of("2010-01-01/P1D"), - "", - null, - null, - null, - null, - 0x9, - 30 - ); - DruidServer server1 = new DruidServer("server1", "host1", null, 1234, ServerType.HISTORICAL, "tier1", 0); - server1.addDataSegment(datasource1Segment1); - server1.addDataSegment(datasource1Segment2); - server1.addDataSegment(datasource2Segment1); - DruidServer server2 = new DruidServer("server2", "host2", null, 1234, ServerType.HISTORICAL, "tier2", 0); - server2.addDataSegment(datasource1Segment2); - server2.addDataSegment(datasource2Segment1); - DruidServer server3 = new DruidServer("server3", "host3", null, 1234, ServerType.HISTORICAL, "tier1", 0); - server3.addDataSegment(datasource1Segment1); - server3.addDataSegment(datasource1Segment2); - server3.addDataSegment(datasource2Segment1); - List segments = ImmutableList.of(datasource1Segment1, datasource1Segment2); + final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); + Object2LongMap tier1 = new Object2LongOpenHashMap<>(); + tier1.put("datasource1", 0L); + Object2LongMap tier2 = new Object2LongOpenHashMap<>(); + tier2.put("datasource1", 3L); + underReplicationCountsPerDataSourcePerTier.put("tier1", tier1); + underReplicationCountsPerDataSourcePerTier.put("tier2", tier2); + // Test when datasource fully loaded EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean())) .andReturn(Optional.of(segments)).once(); - EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server1, server2, server3)).times(2); - MetadataRuleManager databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); - Rule loadRule = new IntervalLoadRule(Intervals.of("2010-01-01T00:00:00Z/2012-01-03T00:00:00Z"), ImmutableMap.of("tier1", 2, "tier2", 2)); - Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z")); - EasyMock.expect(databaseRuleManager.getRulesWithDefault("datasource1")) - .andReturn(ImmutableList.of(loadRule, dropRule)) - .once(); - EasyMock.replay(segmentsMetadataManager, inventoryView, databaseRuleManager); + DruidCoordinator druidCoordinator = EasyMock.createMock(DruidCoordinator.class); + EasyMock.expect(druidCoordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments)) + .andReturn(underReplicationCountsPerDataSourcePerTier).once(); - DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, databaseRuleManager, null, null, null); - Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, "full"); + EasyMock.replay(segmentsMetadataManager, druidCoordinator); + + DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, druidCoordinator); + Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, "full"); Assert.assertEquals(200, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals(2, ((Map) response.getEntity()).size()); @@ -1424,7 +1439,7 @@ public void testGetDatasourceLoadstatusFull() Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier2")).size()); Assert.assertEquals(0L, ((Map) ((Map) response.getEntity()).get("tier1")).get("datasource1")); Assert.assertEquals(3L, ((Map) ((Map) response.getEntity()).get("tier2")).get("datasource1")); - EasyMock.verify(segmentsMetadataManager, inventoryView, databaseRuleManager); + EasyMock.verify(segmentsMetadataManager); } private DruidServerMetadata createRealtimeServerMetadata(String name) From 2470e8fa81e483839e8903a6f982c6108c230c8a Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 15 Jun 2020 10:22:12 -1000 Subject: [PATCH 16/22] fix doc --- docs/operations/api-reference.md | 4 ++-- .../org/apache/druid/server/http/DataSourcesResource.java | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index f300f98d7e90..006f93ceb11a 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -121,9 +121,9 @@ You can verify if segments created by a recent ingestion task are loaded onto hi An example workflow for this is: 1. Submit your ingestion task. 2. Repeatedly poll the Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. -3. Poll the below datasource loadstatus API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. +3. Poll the below `datasource loadstatus` API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. -4. Repeatedly poll the below datasource loadstatus API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. +4. Repeatedly poll the below `datasource loadstatus` API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. ##### GET diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 3f6ff6196f8f..95b8ecc450c4 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -29,7 +29,6 @@ import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import it.unimi.dsi.fastutil.objects.Object2LongMap; -import org.apache.commons.collections4.IterableUtils; import org.apache.commons.lang.StringUtils; import org.apache.druid.client.CoordinatorServerView; import org.apache.druid.client.DruidDataSource; @@ -435,7 +434,7 @@ public Response getDatasourceLoadstatus( return logAndCreateDataSourceNotFoundResponse(dataSourceName); } - if (IterableUtils.size(segments.get()) == 0) { + if (Iterables.size(segments.get()) == 0) { return Response .status(Response.Status.NO_CONTENT) .entity("No used segment found for the given datasource and interval") From cb9f160d0331574fe112ca2467a9518cff5302b5 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 15 Jun 2020 15:52:26 -1000 Subject: [PATCH 17/22] address comments --- docs/ingestion/faq.md | 12 ++++++++++++ docs/operations/api-reference.md | 17 ++++------------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/docs/ingestion/faq.md b/docs/ingestion/faq.md index 1e6ffe10254f..2a2530f49629 100644 --- a/docs/ingestion/faq.md +++ b/docs/ingestion/faq.md @@ -66,6 +66,18 @@ Other common reasons that hand-off fails are as follows: Make sure to include the `druid-hdfs-storage` and all the hadoop configuration, dependencies (that can be obtained by running command `hadoop classpath` on a machine where hadoop has been setup) in the classpath. And, provide necessary HDFS settings as described in [deep storage](../dependencies/deep-storage.md) . +## How do I know when I can make query to Druid after submitting ingestion task? + +You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following workflow. +1. Submit your ingestion task. +2. Repeatedly poll the [Overlord's task API](../operations/api-reference.html#tasks) ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. +3. Poll the [datasource loadstatus API](../operations/api-reference.html#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with +`forceMetadataRefresh=true` and `interval=` once. +If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. +4. Repeatedly poll the [datasource loadstatus API](../operations/api-reference.html#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with +`forceMetadataRefresh=false` and `interval=`. +Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. + ## I don't see my Druid segments on my Historical processes You can check the Coordinator console located at `:`. Make sure that your segments have actually loaded on [Historical processes](../design/historical.md). If your segments are not present, check the Coordinator logs for messages about capacity of replication errors. One reason that segments are not downloaded is because Historical processes have maxSizes that are too small, making them incapable of downloading more data. You can change that with (for example): diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 006f93ceb11a..16544d5e0bae 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -96,11 +96,11 @@ Returns the percentage of segments actually loaded in the cluster versus segment * `/druid/coordinator/v1/loadstatus?simple` -Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replica of segments. +Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include segment replication counts. * `/druid/coordinator/v1/loadstatus?full` -Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This include replica of segments. +Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes segment replication counts. * `/druid/coordinator/v1/loadqueue` @@ -117,15 +117,6 @@ Returns the serialized JSON of segments to load and drop for each Historical pro #### Segment Loading by Datasource -You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following APIs. -An example workflow for this is: -1. Submit your ingestion task. -2. Repeatedly poll the Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. -3. Poll the below `datasource loadstatus` API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. -If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. -4. Repeatedly poll the below `datasource loadstatus` API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. -Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. - ##### GET * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}` @@ -139,7 +130,7 @@ If no used segments are found for the given inputs, this API returns `204 No Con * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}` Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource -over the given interval (or last 2 weeks if interval is not given). This does not include replica of segments. `forceMetadataRefresh` is required to be set. +over the given interval (or last 2 weeks if interval is not given). This does not include segment replication counts. `forceMetadataRefresh` is required to be set. Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store. Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. If no used segments are found for the given inputs, this API returns `204 No Content` @@ -147,7 +138,7 @@ If no used segments are found for the given inputs, this API returns `204 No Con * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}` Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource -over the given interval (or last 2 weeks if interval is not given). This include replica of segments. `forceMetadataRefresh` is required to be set. +over the given interval (or last 2 weeks if interval is not given). This includes segment replication counts. `forceMetadataRefresh` is required to be set. Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store. Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. If no used segments are found for the given inputs, this API returns `204 No Content` From 79acb14fe87c15c3cb795b83476a8f497917ddb0 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 15 Jun 2020 16:04:02 -1000 Subject: [PATCH 18/22] fix typo --- docs/ingestion/faq.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/ingestion/faq.md b/docs/ingestion/faq.md index 2a2530f49629..7657dbb62437 100644 --- a/docs/ingestion/faq.md +++ b/docs/ingestion/faq.md @@ -70,11 +70,11 @@ Make sure to include the `druid-hdfs-storage` and all the hadoop configuration, You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following workflow. 1. Submit your ingestion task. -2. Repeatedly poll the [Overlord's task API](../operations/api-reference.html#tasks) ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. -3. Poll the [datasource loadstatus API](../operations/api-reference.html#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with +2. Repeatedly poll the [Overlord's task API](../operations/api-reference.md#tasks) ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. +3. Poll the [datasource loadstatus API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` and `interval=` once. If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. -4. Repeatedly poll the [datasource loadstatus API](../operations/api-reference.html#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with +4. Repeatedly poll the [datasource loadstatus API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false` and `interval=`. Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. From afac4fd572a0b1fa4fd0439c98d61c3f19524f7f Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Mon, 15 Jun 2020 17:46:30 -1000 Subject: [PATCH 19/22] fix spelling --- docs/ingestion/faq.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/ingestion/faq.md b/docs/ingestion/faq.md index 7657dbb62437..2877ec286484 100644 --- a/docs/ingestion/faq.md +++ b/docs/ingestion/faq.md @@ -70,11 +70,11 @@ Make sure to include the `druid-hdfs-storage` and all the hadoop configuration, You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following workflow. 1. Submit your ingestion task. -2. Repeatedly poll the [Overlord's task API](../operations/api-reference.md#tasks) ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. -3. Poll the [datasource loadstatus API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with +2. Repeatedly poll the [Overlord's tasks API](../operations/api-reference.md#tasks) ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. +3. Poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` and `interval=` once. If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. -4. Repeatedly poll the [datasource loadstatus API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with +4. Repeatedly poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false` and `interval=`. Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. From fc77cc07dc22ec86e73961b582893750ca4bb8da Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 16 Jun 2020 15:39:32 -1000 Subject: [PATCH 20/22] address comments --- docs/ingestion/faq.md | 5 +++-- docs/operations/api-reference.md | 14 +++++++++++--- .../druid/metadata/SqlSegmentsMetadataManager.java | 2 +- .../druid/server/http/DataSourcesResource.java | 4 ++-- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/docs/ingestion/faq.md b/docs/ingestion/faq.md index 2877ec286484..0b08cca6fc99 100644 --- a/docs/ingestion/faq.md +++ b/docs/ingestion/faq.md @@ -66,13 +66,14 @@ Other common reasons that hand-off fails are as follows: Make sure to include the `druid-hdfs-storage` and all the hadoop configuration, dependencies (that can be obtained by running command `hadoop classpath` on a machine where hadoop has been setup) in the classpath. And, provide necessary HDFS settings as described in [deep storage](../dependencies/deep-storage.md) . -## How do I know when I can make query to Druid after submitting ingestion task? +## How do I know when I can make query to Druid after submitting batch ingestion task? You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following workflow. 1. Submit your ingestion task. 2. Repeatedly poll the [Overlord's tasks API](../operations/api-reference.md#tasks) ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed. 3. Poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with -`forceMetadataRefresh=true` and `interval=` once. +`forceMetadataRefresh=true` and `interval=` once. +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms of the load on the metadata store but is necessary to make sure that we verify all the latest segments' load status) If there are segments not yet loaded, continue to step 4, otherwise you can now query the data. 4. Repeatedly poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false` and `interval=`. diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 16544d5e0bae..50287c90a204 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -117,13 +117,17 @@ Returns the serialized JSON of segments to load and drop for each Historical pro #### Segment Loading by Datasource +Note that all _interval_ query parameters are ISO 8601 strings (e.g., 2016-06-27/2016-06-28). + ##### GET * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}` Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource over the given interval (or last 2 weeks if interval is not given). `forceMetadataRefresh` is required to be set. -Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. If no used segments are found for the given inputs, this API returns `204 No Content` @@ -131,7 +135,9 @@ If no used segments are found for the given inputs, this API returns `204 No Con Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource over the given interval (or last 2 weeks if interval is not given). This does not include segment replication counts. `forceMetadataRefresh` is required to be set. -Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. If no used segments are found for the given inputs, this API returns `204 No Content` @@ -139,7 +145,9 @@ If no used segments are found for the given inputs, this API returns `204 No Con Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource over the given interval (or last 2 weeks if interval is not given). This includes segment replication counts. `forceMetadataRefresh` is required to be set. -Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store. +Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store +(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms +of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status) Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. If no used segments are found for the given inputs, this API returns `204 No Content` diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 2f4dd3cc27a1..60f7f4b86eeb 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -432,7 +432,7 @@ private void useLatestIfWithinDelayOrPerformNewDatabasePoll() * made not longer than {@link #periodicPollDelay} from current time. * This method does wait untill completion for if the latest {@link DatabasePoll} is a * {@link PeriodicDatabasePoll} that has not completed it's first poll, or an {@link OnDemandDatabasePoll} that is - * alrady in the process of polling the database. + * already in the process of polling the database. * This means that any method using this check can read from snapshot that is * up to {@link SqlSegmentsMetadataManager#periodicPollDelay} old. */ diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 95b8ecc450c4..8ea4dc06ce2e 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -99,6 +99,7 @@ public class DataSourcesResource { private static final Logger log = new Logger(DataSourcesResource.class); + private static final long DEFAULT_LOADSTATUS_INTERVAL_OFFSET = 14 * 24 * 60 * 60 * 1000; private final CoordinatorServerView serverInventoryView; private final SegmentsMetadataManager segmentsMetadataManager; @@ -417,9 +418,8 @@ public Response getDatasourceLoadstatus( } final Interval theInterval; if (interval == null) { - long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000; long currentTimeInMs = System.currentTimeMillis(); - theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs); + theInterval = Intervals.utc(currentTimeInMs - DEFAULT_LOADSTATUS_INTERVAL_OFFSET, currentTimeInMs); } else { theInterval = Intervals.of(interval.replace('_', '/')); } From 7b64a1aeda61c183baa42bbd9961b51511e63064 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 16 Jun 2020 15:46:18 -1000 Subject: [PATCH 21/22] address comments --- docs/ingestion/faq.md | 1 + docs/operations/api-reference.md | 2 ++ 2 files changed, 3 insertions(+) diff --git a/docs/ingestion/faq.md b/docs/ingestion/faq.md index 0b08cca6fc99..5bb44dae76ba 100644 --- a/docs/ingestion/faq.md +++ b/docs/ingestion/faq.md @@ -78,6 +78,7 @@ If there are segments not yet loaded, continue to step 4, otherwise you can now 4. Repeatedly poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false` and `interval=`. Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. +Note that this workflow only guarantees that the segments are avaiable at the time of the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) call. Segments can still become missing because of historical process failures or any other reasons afterward. ## I don't see my Druid segments on my Historical processes diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 50287c90a204..7eeaca44b321 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -118,6 +118,8 @@ Returns the serialized JSON of segments to load and drop for each Historical pro #### Segment Loading by Datasource Note that all _interval_ query parameters are ISO 8601 strings (e.g., 2016-06-27/2016-06-28). +Also note that these APIs only guarantees that the segments are avaiable at the time of the call. +Segments can still become missing because of historical process failures or any other reasons afterward. ##### GET From 65b60febe9e02f7ca44991aadf2fcf9fb2ab6094 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 16 Jun 2020 18:52:11 -1000 Subject: [PATCH 22/22] fix typo in docs --- docs/ingestion/faq.md | 2 +- docs/operations/api-reference.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/ingestion/faq.md b/docs/ingestion/faq.md index 5bb44dae76ba..308407fa3426 100644 --- a/docs/ingestion/faq.md +++ b/docs/ingestion/faq.md @@ -78,7 +78,7 @@ If there are segments not yet loaded, continue to step 4, otherwise you can now 4. Repeatedly poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false` and `interval=`. Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. -Note that this workflow only guarantees that the segments are avaiable at the time of the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) call. Segments can still become missing because of historical process failures or any other reasons afterward. +Note that this workflow only guarantees that the segments are available at the time of the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) call. Segments can still become missing because of historical process failures or any other reasons afterward. ## I don't see my Druid segments on my Historical processes diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 7eeaca44b321..a3610a8bc52a 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -118,7 +118,7 @@ Returns the serialized JSON of segments to load and drop for each Historical pro #### Segment Loading by Datasource Note that all _interval_ query parameters are ISO 8601 strings (e.g., 2016-06-27/2016-06-28). -Also note that these APIs only guarantees that the segments are avaiable at the time of the call. +Also note that these APIs only guarantees that the segments are available at the time of the call. Segments can still become missing because of historical process failures or any other reasons afterward. ##### GET