From 39dcd326be350ca6b66e4de884708cf77413c166 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 10 Apr 2019 22:28:02 -0700 Subject: [PATCH] Coordinator: Allow dropping all segments. Removes the coordinator sanity check that prevents it from dropping all segments. It's useful to get rid of this, since the behavior is unintuitive for dev/testing clusters where users might regularly want to drop all their data to get back to a clean slate. But the sanity check was there for a reason: to prevent a race condition where the coordinator might drop all segments if it ran before the first metadata store poll finished. This patch addresses that concern differently, by allowing methods in MetadataSegmentManager to return null if a poll has not happened yet, and canceling coordinator runs in that case. This patch also makes the "dataSources" reference in SQLMetadataSegmentManager volatile. I'm not sure why it wasn't volatile before, but it seems necessary to me: it's not final, and it's dereferenced from multiple threads without synchronization. --- docs/content/design/coordinator.md | 3 +- .../metadata/MetadataSegmentManager.java | 11 +++ .../metadata/SQLMetadataSegmentManager.java | 90 ++++++++++++------- .../server/coordinator/DruidCoordinator.java | 50 +++++++++-- .../DruidCoordinatorCleanupUnneeded.java | 22 ++--- .../helper/DruidCoordinatorHelper.java | 10 ++- .../DruidCoordinatorSegmentInfoLoader.java | 8 +- .../druid/server/http/MetadataResource.java | 11 ++- .../SQLMetadataSegmentManagerTest.java | 39 ++++++++ 9 files changed, 186 insertions(+), 58 deletions(-) diff --git a/docs/content/design/coordinator.md b/docs/content/design/coordinator.md index 810f212e604a..cf3c0f87a77e 100644 --- a/docs/content/design/coordinator.md +++ b/docs/content/design/coordinator.md @@ -52,8 +52,7 @@ Segments can be automatically loaded and dropped from the cluster based on a set ### Cleaning Up Segments -Each run, the Druid Coordinator compares the list of available database segments in the database with the current segments in the cluster. Segments that are not in the database but are still being served in the cluster are flagged and appended to a removal list. Segments that are overshadowed (their versions are too old and their data has been replaced by newer segments) are also dropped. -Note that if all segments in database are deleted(or marked unused), then Coordinator will not drop anything from the Historicals. This is done to prevent a race condition in which the Coordinator would drop all segments if it started running cleanup before it finished polling the database for available segments for the first time and believed that there were no segments. +Each run, the Druid coordinator compares the list of available database segments in the database with the current segments in the cluster. Segments that are not in the database but are still being served in the cluster are flagged and appended to a removal list. Segments that are overshadowed (their versions are too old and their data has been replaced by newer segments) are also dropped. ### Segment Availability diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java index 436ad125bf7d..a5584a9bc3fa 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java @@ -58,6 +58,13 @@ public interface MetadataSegmentManager @Nullable ImmutableDruidDataSource getDataSource(String dataSourceName); + /** + * Returns a collection of known datasources. + * + * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has + * not yet been polled.) + */ + @Nullable Collection getDataSources(); /** @@ -65,7 +72,11 @@ public interface MetadataSegmentManager * unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try * (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than * several times. + * + * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has + * not yet been polled.) */ + @Nullable Iterable iterateAllSegments(); Collection getAllDataSourceNames(); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index dff9c9df61ca..0bcf9face187 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -66,6 +66,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -73,6 +74,7 @@ import java.util.stream.Collectors; /** + * */ @ManageLifecycle public class SQLMetadataSegmentManager implements MetadataSegmentManager @@ -102,9 +104,14 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private final Supplier dbTables; private final SQLMetadataConnector connector; - private ConcurrentHashMap dataSources = new ConcurrentHashMap<>(); + // Volatile since this reference is reassigned in "poll" and then read from in other threads. + // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty map) + @Nullable + private volatile ConcurrentHashMap dataSources = null; - /** The number of times this SQLMetadataSegmentManager was started. */ + /** + * The number of times this SQLMetadataSegmentManager was started. + */ private long startCount = 0; /** * Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is currently started; -1 if @@ -200,7 +207,7 @@ public void stop() return; } - dataSources = new ConcurrentHashMap<>(); + dataSources = null; currentStartOrder = -1; exec.shutdownNow(); exec = null; @@ -325,7 +332,7 @@ public boolean removeDataSource(final String dataSource) ).bind("dataSource", dataSource).execute() ); - dataSources.remove(dataSource); + Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource)); if (removed == 0) { return false; @@ -348,18 +355,21 @@ public boolean removeSegment(String dataSourceName, final String segmentId) // Call iteratePossibleParsingsWithDataSource() outside of dataSources.computeIfPresent() because the former is a // potentially expensive operation, while lambda to be passed into computeIfPresent() should preferably run fast. List possibleSegmentIds = SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId); - dataSources.computeIfPresent( - dataSourceName, - (dsName, dataSource) -> { - for (SegmentId possibleSegmentId : possibleSegmentIds) { - if (dataSource.removeSegment(possibleSegmentId) != null) { - break; - } - } - // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. - //noinspection ReturnOfNull - return dataSource.isEmpty() ? null : dataSource; - } + Optional.ofNullable(dataSources).ifPresent( + m -> + m.computeIfPresent( + dataSourceName, + (dsName, dataSource) -> { + for (SegmentId possibleSegmentId : possibleSegmentIds) { + if (dataSource.removeSegment(possibleSegmentId) != null) { + break; + } + } + // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. + //noinspection ReturnOfNull + return dataSource.isEmpty() ? null : dataSource; + } + ) ); return removed; @@ -375,14 +385,17 @@ public boolean removeSegment(SegmentId segmentId) { try { final boolean removed = removeSegmentFromTable(segmentId.toString()); - dataSources.computeIfPresent( - segmentId.getDataSource(), - (dsName, dataSource) -> { - dataSource.removeSegment(segmentId); - // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. - //noinspection ReturnOfNull - return dataSource.isEmpty() ? null : dataSource; - } + Optional.ofNullable(dataSources).ifPresent( + m -> + m.computeIfPresent( + segmentId.getDataSource(), + (dsName, dataSource) -> { + dataSource.removeSegment(segmentId); + // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. + //noinspection ReturnOfNull + return dataSource.isEmpty() ? null : dataSource; + } + ) ); return removed; } @@ -422,23 +435,37 @@ public boolean isStarted() @Nullable public ImmutableDruidDataSource getDataSource(String dataSourceName) { - final DruidDataSource dataSource = dataSources.get(dataSourceName); + final DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(dataSourceName)).orElse(null); return dataSource == null ? null : dataSource.toImmutableDruidDataSource(); } @Override + @Nullable public Collection getDataSources() { - return dataSources.values() - .stream() - .map(DruidDataSource::toImmutableDruidDataSource) - .collect(Collectors.toList()); + return Optional.ofNullable(dataSources) + .map(m -> + m.values() + .stream() + .map(DruidDataSource::toImmutableDruidDataSource) + .collect(Collectors.toList()) + ) + .orElse(null); } @Override + @Nullable public Iterable iterateAllSegments() { - return () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator(); + final ConcurrentHashMap dataSourcesSnapshot = dataSources; + if (dataSourcesSnapshot == null) { + return null; + } + + return () -> dataSourcesSnapshot.values() + .stream() + .flatMap(dataSource -> dataSource.getSegments().stream()) + .iterator(); } @Override @@ -543,6 +570,7 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE .addSegmentIfAbsent(segment); }); + // Replace "dataSources" atomically. dataSources = newDataSources; } @@ -557,7 +585,7 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE */ private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment) { - DruidDataSource dataSource = dataSources.get(segment.getDataSource()); + DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(segment.getDataSource())).orElse(null); if (dataSource == null) { return segment; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 3b5faef7a04b..27bee2f0d225 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -74,8 +74,10 @@ import org.joda.time.DateTime; import org.joda.time.Duration; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -88,6 +90,7 @@ import java.util.stream.Collectors; /** + * */ @ManageLifecycle public class DruidCoordinator @@ -242,7 +245,9 @@ public Map getLoadManagementPeons() return loadManagementPeons; } - /** @return tier -> { dataSource -> underReplicationCount } map */ + /** + * @return tier -> { dataSource -> underReplicationCount } map + */ public Map> computeUnderReplicationCountsPerDataSourcePerTier() { final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); @@ -251,9 +256,15 @@ public Map> computeUnderReplicationCountsPerDataS return underReplicationCountsPerDataSourcePerTier; } + final Iterable dataSegments = iterateAvailableDataSegments(); + + if (dataSegments == null) { + return underReplicationCountsPerDataSourcePerTier; + } + final DateTime now = DateTimes.nowUtc(); - for (final DataSegment segment : iterateAvailableDataSegments()) { + for (final DataSegment segment : dataSegments) { final List rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource()); for (final Rule rule : rules) { @@ -285,7 +296,13 @@ public Object2LongMap getSegmentAvailability() return retVal; } - for (DataSegment segment : iterateAvailableDataSegments()) { + final Iterable dataSegments = iterateAvailableDataSegments(); + + if (dataSegments == null) { + return retVal; + } + + for (DataSegment segment : dataSegments) { if (segmentReplicantLookup.getLoadedReplicants(segment.getId()) == 0) { retVal.addTo(segment.getDataSource(), 1); } else { @@ -298,8 +315,14 @@ public Object2LongMap getSegmentAvailability() public Map getLoadStatus() { - Map loadStatus = new HashMap<>(); - for (ImmutableDruidDataSource dataSource : metadataSegmentManager.getDataSources()) { + final Map loadStatus = new HashMap<>(); + final Collection dataSources = metadataSegmentManager.getDataSources(); + + if (dataSources == null) { + return loadStatus; + } + + for (ImmutableDruidDataSource dataSource : dataSources) { final Set segments = Sets.newHashSet(dataSource.getSegments()); final int availableSegmentSize = segments.size(); @@ -453,7 +476,11 @@ public void moveSegment( * is unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try * (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than * several times. + * + * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has + * not yet been polled.) */ + @Nullable public Iterable iterateAvailableDataSegments() { return metadataSegmentManager.iterateAllSegments(); @@ -643,10 +670,16 @@ public void run() BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec); // Do coordinator stuff. + final Collection dataSources = metadataSegmentManager.getDataSources(); + if (dataSources == null) { + log.info("Metadata store not polled yet, skipping this run."); + return; + } + DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder() .withStartTime(startTime) - .withDataSources(metadataSegmentManager.getDataSources()) + .withDataSources(dataSources) .withDynamicConfigs(getDynamicConfigs()) .withCompactionConfig(getCompactionConfig()) .withEmitter(emitter) @@ -656,6 +689,11 @@ public void run() // Don't read state and run state in the same helper otherwise racy conditions may exist if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { params = helper.run(params); + + if (params == null) { + // This helper wanted to cancel the run. No log message, since the helper should have logged a reason. + return; + } } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java index a7a1bcce2205..2e77577b69b0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java @@ -33,6 +33,7 @@ import java.util.SortedSet; /** + * */ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper { @@ -45,21 +46,12 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) Set availableSegments = params.getAvailableSegments(); DruidCluster cluster = params.getDruidCluster(); - if (availableSegments.isEmpty()) { - log.info( - "Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent " + - "a race condition in which the coordinator would drop all segments if it started running cleanup before " + - "it finished polling the metadata storage for available segments for the first time." - ); - return params.buildFromExisting().withCoordinatorStats(stats).build(); - } - - // Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It might - // not have been loaded yet since it's filled asynchronously. But it's also filled atomically, so if there are any - // segments at all, we should have all of them.) - // Note that if metadata store has no segments, then availableSegments will stay empty and nothing will be dropped. - // This is done to prevent a race condition in which the coordinator would drop all segments if it started running - // cleanup before it finished polling the metadata storage for available segments for the first time. + // Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It's + // also filled atomically, so if there are any segments at all, we should have all of them.) + // + // Note that if the metadata store has not been polled yet, "getAvailableSegments" would throw an error since + // "availableSegments" is null. But this won't happen, since the earlier helper "DruidCoordinatorSegmentInfoLoader" + // would have canceled the run. for (SortedSet serverHolders : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serverHolders) { ImmutableDruidServer server = serverHolder.getServer(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java index a2752c352708..78ee92bde323 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorHelper.java @@ -21,7 +21,10 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import javax.annotation.Nullable; + /** + * */ public interface DruidCoordinatorHelper { @@ -29,8 +32,13 @@ public interface DruidCoordinatorHelper * Implementations of this method run various activities performed by the coordinator. * Input params can be used and modified. They are typically in a list and returned * DruidCoordinatorRuntimeParams is passed to the next helper. + * * @param params - * @return same as input or a modified value to be used by next helper. + * + * @return same as input or a modified value to be used by next helper. Null return + * values will prevent future DruidCoordinatorHelpers from running until the next + * cycle. */ + @Nullable DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java index 801bd3ba5021..2353247e3f30 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java @@ -43,6 +43,12 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { log.info("Starting coordination. Getting available segments."); + final Iterable dataSegments = coordinator.iterateAvailableDataSegments(); + if (dataSegments == null) { + log.info("Metadata store not polled yet, canceling this run."); + return null; + } + // The following transform() call doesn't actually transform the iterable. It only checks the sizes of the segments // and emits alerts if segments with negative sizes are encountered. In other words, semantically it's similar to // Stream.peek(). It works as long as DruidCoordinatorRuntimeParams.createAvailableSegmentsSet() (which is called @@ -54,7 +60,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // //noinspection StaticPseudoFunctionalStyleMethod: https://youtrack.jetbrains.com/issue/IDEA-153047 Iterable availableSegmentsWithSizeChecking = Iterables.transform( - coordinator.iterateAvailableDataSegments(), + dataSegments, segment -> { if (segment.getSize() < 0) { log.makeAlert("No size on a segment") diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index c7e270214ff4..af106fb863fb 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -55,6 +55,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -96,7 +97,10 @@ public Response getDatabaseDataSources( @Context final HttpServletRequest req ) { - final Collection druidDataSources = metadataSegmentManager.getDataSources(); + // If we haven't polled the metadata store yet, use an empty list of datasources. + final Collection druidDataSources = Optional.ofNullable(metadataSegmentManager.getDataSources()) + .orElse(Collections.emptyList()); + final Set dataSourceNamesPreAuth; if (includeDisabled != null) { dataSourceNamesPreAuth = new TreeSet<>(metadataSegmentManager.getAllDataSourceNames()); @@ -154,7 +158,10 @@ public Response getDatabaseSegments( @QueryParam("datasources") final Set datasources ) { - Collection druidDataSources = metadataSegmentManager.getDataSources(); + // If we haven't polled the metadata store yet, use an empty list of datasources. + Collection druidDataSources = Optional.ofNullable(metadataSegmentManager.getDataSources()) + .orElse(Collections.emptyList()); + if (datasources != null && !datasources.isEmpty()) { druidDataSources = druidDataSources.stream() .filter(src -> datasources.contains(src.getName())) diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java index a9f8f3c5df00..242dc5ecb277 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java @@ -39,6 +39,7 @@ import org.junit.Test; import java.io.IOException; +import java.util.stream.Collectors; public class SQLMetadataSegmentManagerTest @@ -123,10 +124,48 @@ public void testPoll() ImmutableList.of("wikipedia"), manager.getAllDataSourceNames() ); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + manager.getDataSources().stream().map(d -> d.getName()).collect(Collectors.toList()) + ); Assert.assertEquals( ImmutableSet.of(segment1, segment2), ImmutableSet.copyOf(manager.getDataSource("wikipedia").getSegments()) ); + Assert.assertEquals( + ImmutableSet.of(segment1, segment2), + ImmutableSet.copyOf(manager.iterateAllSegments()) + ); + } + + @Test + public void testNoPoll() + { + manager.start(); + Assert.assertTrue(manager.isStarted()); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + manager.getAllDataSourceNames() + ); + Assert.assertNull(manager.getDataSources()); + Assert.assertNull(manager.getDataSource("wikipedia")); + Assert.assertNull(manager.iterateAllSegments()); + } + + @Test + public void testPollThenStop() + { + manager.start(); + manager.poll(); + manager.stop(); + Assert.assertFalse(manager.isStarted()); + Assert.assertEquals( + ImmutableList.of("wikipedia"), + manager.getAllDataSourceNames() + ); + Assert.assertNull(manager.getDataSources()); + Assert.assertNull(manager.getDataSource("wikipedia")); + Assert.assertNull(manager.iterateAllSegments()); } @Test