From 6025bcdd4ee6addac69dd51cb2382c4e615f9b7d Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sun, 5 May 2019 19:43:12 -0700 Subject: [PATCH 01/18] Move the overshadowed segment computation to SQLMetadataSegmentManager's poll --- ...=> DataSegmentWithOvershadowedStatus.java} | 12 +++--- .../client/ImmutableDruidDataSource.java | 40 ------------------- .../metadata/MetadataSegmentManager.java | 9 +++++ .../metadata/SQLMetadataSegmentManager.java | 39 ++++++++++++++++++ .../server/coordinator/DruidCoordinator.java | 5 +++ .../helper/DruidCoordinatorRuleRunner.java | 11 +++-- .../druid/server/http/MetadataResource.java | 35 +++++++--------- .../calcite/schema/MetadataSegmentView.java | 18 ++++----- .../sql/calcite/schema/SystemSchema.java | 23 ++++++----- .../sql/calcite/schema/SystemSchemaTest.java | 14 +++---- 10 files changed, 109 insertions(+), 97 deletions(-) rename core/src/main/java/org/apache/druid/timeline/{SegmentWithOvershadowedStatus.java => DataSegmentWithOvershadowedStatus.java} (81%) diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/DataSegmentWithOvershadowedStatus.java similarity index 81% rename from core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java rename to core/src/main/java/org/apache/druid/timeline/DataSegmentWithOvershadowedStatus.java index e86daea7d86e..b4edb43fc8b4 100644 --- a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java +++ b/core/src/main/java/org/apache/druid/timeline/DataSegmentWithOvershadowedStatus.java @@ -25,16 +25,16 @@ /** * DataSegment object plus the overshadowed status for the segment. An immutable object. * - * SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId} + * DataSegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId} * of the DataSegment object. */ -public class SegmentWithOvershadowedStatus implements Comparable +public class DataSegmentWithOvershadowedStatus implements Comparable { private final boolean overshadowed; private final DataSegment dataSegment; @JsonCreator - public SegmentWithOvershadowedStatus( + public DataSegmentWithOvershadowedStatus( @JsonProperty("dataSegment") DataSegment dataSegment, @JsonProperty("overshadowed") boolean overshadowed ) @@ -61,10 +61,10 @@ public boolean equals(Object o) if (this == o) { return true; } - if (!(o instanceof SegmentWithOvershadowedStatus)) { + if (!(o instanceof DataSegmentWithOvershadowedStatus)) { return false; } - final SegmentWithOvershadowedStatus that = (SegmentWithOvershadowedStatus) o; + final DataSegmentWithOvershadowedStatus that = (DataSegmentWithOvershadowedStatus) o; if (!dataSegment.equals(that.dataSegment)) { return false; } @@ -83,7 +83,7 @@ public int hashCode() } @Override - public int compareTo(SegmentWithOvershadowedStatus o) + public int compareTo(DataSegmentWithOvershadowedStatus o) { return dataSegment.getId().compareTo(o.dataSegment.getId()); } diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java index 841b7169458e..59539443b618 100644 --- a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java +++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java @@ -25,17 +25,12 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; -import com.google.common.collect.Ordering; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.VersionedIntervalTimeline; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Set; /** * An immutable collection of metadata of segments ({@link DataSegment} objects), belonging to a particular data source. @@ -114,41 +109,6 @@ public long getTotalSizeOfSegments() return totalSizeOfSegments; } - /** - * This method finds the overshadowed segments from the given segments - * - * @return set of overshadowed segments - */ - public static Set determineOvershadowedSegments(Iterable segments) - { - final Map> timelines = buildTimelines(segments); - - final Set overshadowedSegments = new HashSet<>(); - for (DataSegment dataSegment : segments) { - final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); - if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { - overshadowedSegments.add(dataSegment); - } - } - return overshadowedSegments; - } - - /** - * Builds a timeline from given segments - * - * @return map of datasource to VersionedIntervalTimeline of segments - */ - private static Map> buildTimelines( - Iterable segments - ) - { - final Map> timelines = new HashMap<>(); - segments.forEach(segment -> timelines - .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) - .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); - return timelines; - } - @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java index 4fc517718f2a..be2c5326d25f 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java @@ -98,6 +98,15 @@ public interface MetadataSegmentManager Collection getAllDataSourceNames(); + /** + * Returns a collection of overshadowed segments + * + * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has + * not yet been polled.) + */ + @Nullable + Collection getOvershadowedSegments(); + /** * Returns top N unused segment intervals in given interval when ordered by segment start time, end time. */ diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index 35843c95b9dd..c3ad3cda8f4f 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; import com.google.inject.Inject; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.ImmutableDruidDataSource; @@ -60,11 +62,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -108,6 +113,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager // null and nonnull multiple times as stop() and start() are called. @Nullable private volatile ConcurrentHashMap dataSources = null; + @Nullable + private volatile ImmutableSet overshadowedSegments = null; /** * The number of times this SQLMetadataSegmentManager was started. @@ -640,6 +647,12 @@ public Iterable iterateAllSegments() .iterator(); } + @Override + public Set getOvershadowedSegments() + { + return overshadowedSegments; + } + @Override public Collection getAllDataSourceNames() { @@ -744,6 +757,32 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE // Replace "dataSources" atomically. dataSources = newDataSources; + overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments(segments)); + } + + /** + * This method builds a timeline from given segments and finds the overshadowed segments + * + * @return set of overshadowed segments + */ + private Set determineOvershadowedSegments(Iterable segments) + { + final Map> timelines = new HashMap<>(); + segments.forEach(segment -> timelines + .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) + .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); + + // It's fine to add all overshadowed segments to a single collection because only + // a small fraction of the segments in the cluster are expected to be overshadowed, + // so building this collection shouldn't generate a lot of garbage. + final Set overshadowedSegments = new HashSet<>(); + for (DataSegment dataSegment : segments) { + final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); + if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { + overshadowedSegments.add(dataSegment); + } + } + return overshadowedSegments; } /** diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 27bee2f0d225..a1c3caad3ea9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -245,6 +245,11 @@ public Map getLoadManagementPeons() return loadManagementPeons; } + public MetadataSegmentManager getMetadataSegmentManager() + { + return metadataSegmentManager; + } + /** * @return tier -> { dataSource -> underReplicationCount } map */ diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index bbceaafed273..5618b78477d2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -20,7 +20,6 @@ package org.apache.druid.server.coordinator.helper; import com.google.common.collect.Lists; -import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataRuleManager; @@ -34,8 +33,10 @@ import org.apache.druid.timeline.SegmentId; import org.joda.time.DateTime; +import java.util.Collection; +import java.util.Collections; import java.util.List; -import java.util.Set; +import java.util.Optional; /** */ @@ -84,8 +85,10 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // find available segments which are not overshadowed by other segments in DB // only those would need to be loaded/dropped // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed - final Set overshadowed = ImmutableDruidDataSource - .determineOvershadowedSegments(params.getAvailableSegments()); + // If metadata store hasn't been polled yet, use empty overshadowed list + final Collection overshadowed = Optional + .ofNullable(coordinator.getMetadataSegmentManager().getOvershadowedSegments()) + .orElse(Collections.emptyList()); for (String tier : cluster.getTierNames()) { replicatorThrottler.updateReplicationState(tier); diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 3c7e8ac37b83..ffc82159fd09 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -36,8 +36,8 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegmentWithOvershadowedStatus; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.joda.time.Interval; import javax.servlet.http.HttpServletRequest; @@ -52,7 +52,6 @@ import javax.ws.rs.core.Response; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -163,11 +162,11 @@ public Response getDatabaseSegments( final Stream metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream()); if (includeOvershadowedStatus != null) { - final Iterable authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus( - req, - druidDataSources, - metadataSegments - ); + final Iterable authorizedSegments = + findAuthorizedSegmentWithOvershadowedStatus( + req, + metadataSegments + ); Response.ResponseBuilder builder = Response.status(Response.Status.OK); return builder.entity(authorizedSegments).build(); } else { @@ -187,30 +186,26 @@ public Response getDatabaseSegments( } } - private Iterable findAuthorizedSegmentWithOvershadowedStatus( + private Iterable findAuthorizedSegmentWithOvershadowedStatus( HttpServletRequest req, - Collection druidDataSources, Stream metadataSegments ) { - // It's fine to add all overshadowed segments to a single collection because only - // a small fraction of the segments in the cluster are expected to be overshadowed, - // so building this collection shouldn't generate a lot of garbage. - final Set overshadowedSegments = new HashSet<>(); - for (ImmutableDruidDataSource dataSource : druidDataSources) { - overshadowedSegments.addAll(ImmutableDruidDataSource.determineOvershadowedSegments(dataSource.getSegments())); - } + // If metadata store hasn't been polled yet, use empty overshadowed list + final Collection overshadowedSegments = Optional + .ofNullable(metadataSegmentManager.getOvershadowedSegments()) + .orElse(Collections.emptyList()); - final Stream segmentsWithOvershadowedStatus = metadataSegments - .map(segment -> new SegmentWithOvershadowedStatus( + final Stream segmentsWithOvershadowedStatus = metadataSegments + .map(segment -> new DataSegmentWithOvershadowedStatus( segment, overshadowedSegments.contains(segment) )); - final Function> raGenerator = segment -> Collections + final Function> raGenerator = segment -> Collections .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( req, segmentsWithOvershadowedStatus::iterator, raGenerator, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 18d288eb6915..29a1fcf2e574 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -44,7 +44,7 @@ import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentWithOvershadowedStatus; +import org.apache.druid.timeline.DataSegmentWithOvershadowedStatus; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -80,7 +80,7 @@ public class MetadataSegmentView * from other threads. */ @MonotonicNonNull - private volatile ImmutableSortedSet publishedSegments = null; + private volatile ImmutableSortedSet publishedSegments = null; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -139,18 +139,18 @@ public void stop() private void poll() { log.info("polling published segments from coordinator"); - final JsonParserIterator metadataSegments = getMetadataSegments( + final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, responseHandler, segmentWatcherConfig.getWatchedDataSources() ); - final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder(); + final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder(); while (metadataSegments.hasNext()) { - final SegmentWithOvershadowedStatus segment = metadataSegments.next(); + final DataSegmentWithOvershadowedStatus segment = metadataSegments.next(); final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment()); - final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new SegmentWithOvershadowedStatus( + final DataSegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new DataSegmentWithOvershadowedStatus( interned, segment.isOvershadowed() ); @@ -160,7 +160,7 @@ private void poll() cachePopulated.countDown(); } - public Iterator getPublishedSegments() + public Iterator getPublishedSegments() { if (isCacheEnabled) { Uninterruptibles.awaitUninterruptibly(cachePopulated); @@ -176,7 +176,7 @@ public Iterator getPublishedSegments() } // Note that coordinator must be up to get segments - private JsonParserIterator getMetadataSegments( + private JsonParserIterator getMetadataSegments( DruidLeaderClient coordinatorClient, ObjectMapper jsonMapper, BytesAccumulatingResponseHandler responseHandler, @@ -210,7 +210,7 @@ private JsonParserIterator getMetadataSegments( responseHandler ); - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() + final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() { }); return new JsonParserIterator<>( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index e5cfa911c452..6c2b24841af1 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -67,8 +67,8 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegmentWithOvershadowedStatus; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.jboss.netty.handler.codec.http.HttpMethod; import javax.annotation.Nullable; @@ -92,7 +92,7 @@ public class SystemSchema extends AbstractSchema private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String TASKS_TABLE = "tasks"; - private static final Function> + private static final Function> SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR = segment -> Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply( segment.getDataSegment().getDataSource()) @@ -255,7 +255,7 @@ public Enumerable scan(DataContext root) } //get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator - final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); + final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = new HashSet<>(); @@ -340,20 +340,21 @@ public Enumerable scan(DataContext root) } - private Iterator getAuthorizedPublishedSegments( - Iterator it, + private Iterator getAuthorizedPublishedSegments( + Iterator it, DataContext root ) { final AuthenticationResult authenticationResult = (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( - authenticationResult, - () -> it, - SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR, - authorizerMapper - ); + final Iterable authorizedSegments = AuthorizationUtils + .filterAuthorizedResources( + authenticationResult, + () -> it, + SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR, + authorizerMapper + ); return authorizedSegments.iterator(); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index a942db4b556c..51dd9fb3ef7e 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -75,8 +75,8 @@ import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegmentWithOvershadowedStatus; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -409,12 +409,12 @@ public void testSegmentsTable() .withConstructor(druidSchema, metadataView, mapper, authMapper) .createMock(); EasyMock.replay(segmentsTable); - final Set publishedSegments = Stream.of( - new SegmentWithOvershadowedStatus(publishedSegment1, true), - new SegmentWithOvershadowedStatus(publishedSegment2, false), - new SegmentWithOvershadowedStatus(publishedSegment3, false), - new SegmentWithOvershadowedStatus(segment1, true), - new SegmentWithOvershadowedStatus(segment2, false) + final Set publishedSegments = Stream.of( + new DataSegmentWithOvershadowedStatus(publishedSegment1, true), + new DataSegmentWithOvershadowedStatus(publishedSegment2, false), + new DataSegmentWithOvershadowedStatus(publishedSegment3, false), + new DataSegmentWithOvershadowedStatus(segment1, true), + new DataSegmentWithOvershadowedStatus(segment2, false) ).collect(Collectors.toSet()); EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once(); From 1964b3329917cfbd4bca7159a2e25774a1a3ad04 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sun, 5 May 2019 19:46:34 -0700 Subject: [PATCH 02/18] rename method in MetadataSegmentManager --- .../java/org/apache/druid/metadata/MetadataSegmentManager.java | 2 +- .../org/apache/druid/metadata/SQLMetadataSegmentManager.java | 2 +- .../server/coordinator/helper/DruidCoordinatorRuleRunner.java | 2 +- .../java/org/apache/druid/server/http/MetadataResource.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java index be2c5326d25f..044aec794e72 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java @@ -105,7 +105,7 @@ public interface MetadataSegmentManager * not yet been polled.) */ @Nullable - Collection getOvershadowedSegments(); + Collection findOvershadowedSegments(); /** * Returns top N unused segment intervals in given interval when ordered by segment start time, end time. diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index c3ad3cda8f4f..fa85608755b4 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -648,7 +648,7 @@ public Iterable iterateAllSegments() } @Override - public Set getOvershadowedSegments() + public Set findOvershadowedSegments() { return overshadowedSegments; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index 5618b78477d2..433637ece49e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -87,7 +87,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed // If metadata store hasn't been polled yet, use empty overshadowed list final Collection overshadowed = Optional - .ofNullable(coordinator.getMetadataSegmentManager().getOvershadowedSegments()) + .ofNullable(coordinator.getMetadataSegmentManager().findOvershadowedSegments()) .orElse(Collections.emptyList()); for (String tier : cluster.getTierNames()) { diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index ffc82159fd09..98b5b3362fd9 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -193,7 +193,7 @@ private Iterable findAuthorizedSegmentWithOve { // If metadata store hasn't been polled yet, use empty overshadowed list final Collection overshadowedSegments = Optional - .ofNullable(metadataSegmentManager.getOvershadowedSegments()) + .ofNullable(metadataSegmentManager.findOvershadowedSegments()) .orElse(Collections.emptyList()); final Stream segmentsWithOvershadowedStatus = metadataSegments From a32dc084d1f5d89c5037bb5928baa19f20aa05d8 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 6 May 2019 11:25:14 -0700 Subject: [PATCH 03/18] Fix tests --- .../DruidCoordinatorRuleRunnerTest.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 51a79307c752..c23a9d63f251 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.metadata.MetadataRuleManager; +import org.apache.druid.metadata.MetadataSegmentManager; import org.apache.druid.segment.IndexIO; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; @@ -67,6 +69,7 @@ public class DruidCoordinatorRuleRunnerTest private DruidCoordinatorRuleRunner ruleRunner; private ServiceEmitter emitter; private MetadataRuleManager databaseRuleManager; + private MetadataSegmentManager databaseSegmentManager; @Before public void setUp() @@ -76,6 +79,7 @@ public void setUp() emitter = EasyMock.createMock(ServiceEmitter.class); EmittingLogger.registerEmitter(emitter); databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); + databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class); DateTime start = DateTimes.of("2012-01-01"); availableSegments = new ArrayList<>(); @@ -557,6 +561,7 @@ public void testDropRemove() EasyMock.expectLastCall().atLeastOnce(); mockEmptyPeon(); + EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); @@ -989,7 +994,10 @@ public void testDropServerActuallyServesSegment() @Test public void testReplicantThrottle() { - mockCoordinator(); + EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); + EasyMock.expect(databaseSegmentManager.findOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); + EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); + EasyMock.replay(coordinator, databaseSegmentManager); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); mockEmptyPeon(); @@ -1114,6 +1122,7 @@ public void testReplicantThrottleAcrossTiers() .build() ) .atLeastOnce(); + EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); EasyMock.replay(coordinator); @@ -1331,7 +1340,10 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() availableSegments.add(v1); availableSegments.add(v2); - mockCoordinator(); + EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); + EasyMock.expect(databaseSegmentManager.findOvershadowedSegments()).andReturn(ImmutableSet.of(v1)).anyTimes(); + EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); + EasyMock.replay(coordinator, databaseSegmentManager); mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.anyObject()); EasyMock.expectLastCall().once(); mockEmptyPeon(); @@ -1395,6 +1407,7 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() private void mockCoordinator() { + EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); From 258bacc16076f116b8e020c74822f59b94979bb2 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 8 May 2019 16:36:27 -0700 Subject: [PATCH 04/18] PR comments --- .../apache/druid/metadata/MetadataSegmentManager.java | 2 +- .../apache/druid/metadata/SQLMetadataSegmentManager.java | 9 ++++----- .../coordinator/helper/DruidCoordinatorRuleRunner.java | 2 +- .../org/apache/druid/server/http/MetadataResource.java | 2 +- .../coordinator/DruidCoordinatorRuleRunnerTest.java | 4 ++-- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java index 044aec794e72..be2c5326d25f 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java @@ -105,7 +105,7 @@ public interface MetadataSegmentManager * not yet been polled.) */ @Nullable - Collection findOvershadowedSegments(); + Collection getOvershadowedSegments(); /** * Returns top N unused segment intervals in given interval when ordered by segment start time, end time. diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index fa85608755b4..2991dcc6307b 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -63,7 +63,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -648,7 +647,7 @@ public Iterable iterateAllSegments() } @Override - public Set findOvershadowedSegments() + public Set getOvershadowedSegments() { return overshadowedSegments; } @@ -763,9 +762,9 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE /** * This method builds a timeline from given segments and finds the overshadowed segments * - * @return set of overshadowed segments + * @return overshadowed segments list */ - private Set determineOvershadowedSegments(Iterable segments) + private List determineOvershadowedSegments(Iterable segments) { final Map> timelines = new HashMap<>(); segments.forEach(segment -> timelines @@ -775,7 +774,7 @@ private Set determineOvershadowedSegments(Iterable seg // It's fine to add all overshadowed segments to a single collection because only // a small fraction of the segments in the cluster are expected to be overshadowed, // so building this collection shouldn't generate a lot of garbage. - final Set overshadowedSegments = new HashSet<>(); + final List overshadowedSegments = new ArrayList<>(); for (DataSegment dataSegment : segments) { final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index 433637ece49e..5618b78477d2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -87,7 +87,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed // If metadata store hasn't been polled yet, use empty overshadowed list final Collection overshadowed = Optional - .ofNullable(coordinator.getMetadataSegmentManager().findOvershadowedSegments()) + .ofNullable(coordinator.getMetadataSegmentManager().getOvershadowedSegments()) .orElse(Collections.emptyList()); for (String tier : cluster.getTierNames()) { diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 98b5b3362fd9..ffc82159fd09 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -193,7 +193,7 @@ private Iterable findAuthorizedSegmentWithOve { // If metadata store hasn't been polled yet, use empty overshadowed list final Collection overshadowedSegments = Optional - .ofNullable(metadataSegmentManager.findOvershadowedSegments()) + .ofNullable(metadataSegmentManager.getOvershadowedSegments()) .orElse(Collections.emptyList()); final Stream segmentsWithOvershadowedStatus = metadataSegments diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index c23a9d63f251..4d60bfb15571 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -995,7 +995,7 @@ public void testDropServerActuallyServesSegment() public void testReplicantThrottle() { EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); - EasyMock.expect(databaseSegmentManager.findOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); + EasyMock.expect(databaseSegmentManager.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); EasyMock.replay(coordinator, databaseSegmentManager); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); @@ -1341,7 +1341,7 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() availableSegments.add(v2); EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); - EasyMock.expect(databaseSegmentManager.findOvershadowedSegments()).andReturn(ImmutableSet.of(v1)).anyTimes(); + EasyMock.expect(databaseSegmentManager.getOvershadowedSegments()).andReturn(ImmutableSet.of(v1)).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); EasyMock.replay(coordinator, databaseSegmentManager); mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.anyObject()); From fd7fddea4f855ca76482c76ba80470c7b8d71b69 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sat, 25 May 2019 15:30:40 -0700 Subject: [PATCH 05/18] PR comments --- ...java => SegmentWithOvershadowedStatus.java} | 10 +++++----- .../metadata/SQLMetadataSegmentManager.java | 8 +++++--- .../druid/server/http/MetadataResource.java | 14 +++++++------- .../calcite/schema/MetadataSegmentView.java | 18 +++++++++--------- .../druid/sql/calcite/schema/SystemSchema.java | 12 ++++++------ .../sql/calcite/schema/SystemSchemaTest.java | 14 +++++++------- 6 files changed, 39 insertions(+), 37 deletions(-) rename core/src/main/java/org/apache/druid/timeline/{DataSegmentWithOvershadowedStatus.java => SegmentWithOvershadowedStatus.java} (85%) diff --git a/core/src/main/java/org/apache/druid/timeline/DataSegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java similarity index 85% rename from core/src/main/java/org/apache/druid/timeline/DataSegmentWithOvershadowedStatus.java rename to core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java index b4edb43fc8b4..7074e1dc0c7e 100644 --- a/core/src/main/java/org/apache/druid/timeline/DataSegmentWithOvershadowedStatus.java +++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java @@ -28,13 +28,13 @@ * DataSegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId} * of the DataSegment object. */ -public class DataSegmentWithOvershadowedStatus implements Comparable +public class SegmentWithOvershadowedStatus implements Comparable { private final boolean overshadowed; private final DataSegment dataSegment; @JsonCreator - public DataSegmentWithOvershadowedStatus( + public SegmentWithOvershadowedStatus( @JsonProperty("dataSegment") DataSegment dataSegment, @JsonProperty("overshadowed") boolean overshadowed ) @@ -61,10 +61,10 @@ public boolean equals(Object o) if (this == o) { return true; } - if (!(o instanceof DataSegmentWithOvershadowedStatus)) { + if (!(o instanceof SegmentWithOvershadowedStatus)) { return false; } - final DataSegmentWithOvershadowedStatus that = (DataSegmentWithOvershadowedStatus) o; + final SegmentWithOvershadowedStatus that = (SegmentWithOvershadowedStatus) o; if (!dataSegment.equals(that.dataSegment)) { return false; } @@ -83,7 +83,7 @@ public int hashCode() } @Override - public int compareTo(DataSegmentWithOvershadowedStatus o) + public int compareTo(SegmentWithOvershadowedStatus o) { return dataSegment.getId().compareTo(o.dataSegment.getId()); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index 2991dcc6307b..9dd964d794ce 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -214,6 +214,7 @@ public void stop() } dataSources = null; + overshadowedSegments = null; currentStartOrder = -1; exec.shutdownNow(); exec = null; @@ -455,8 +456,6 @@ public boolean removeDataSource(final String dataSource) ).bind("dataSource", dataSource).execute() ); - Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource)); - if (removed == 0) { return false; } @@ -756,11 +755,14 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE // Replace "dataSources" atomically. dataSources = newDataSources; + // the overshadowedSegments could become invalid if dataSources is updated outside of + // this method, currently both dataSources and overshadowedSegments are updated here + // and in stop() overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments(segments)); } /** - * This method builds a timeline from given segments and finds the overshadowed segments + * This method builds timelines from given segments and finds the overshadowed segments list * * @return overshadowed segments list */ diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index ffc82159fd09..279bb7a40642 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -36,8 +36,8 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.DataSegmentWithOvershadowedStatus; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.joda.time.Interval; import javax.servlet.http.HttpServletRequest; @@ -162,7 +162,7 @@ public Response getDatabaseSegments( final Stream metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream()); if (includeOvershadowedStatus != null) { - final Iterable authorizedSegments = + final Iterable authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus( req, metadataSegments @@ -186,7 +186,7 @@ public Response getDatabaseSegments( } } - private Iterable findAuthorizedSegmentWithOvershadowedStatus( + private Iterable findAuthorizedSegmentWithOvershadowedStatus( HttpServletRequest req, Stream metadataSegments ) @@ -196,16 +196,16 @@ private Iterable findAuthorizedSegmentWithOve .ofNullable(metadataSegmentManager.getOvershadowedSegments()) .orElse(Collections.emptyList()); - final Stream segmentsWithOvershadowedStatus = metadataSegments - .map(segment -> new DataSegmentWithOvershadowedStatus( + final Stream segmentsWithOvershadowedStatus = metadataSegments + .map(segment -> new SegmentWithOvershadowedStatus( segment, overshadowedSegments.contains(segment) )); - final Function> raGenerator = segment -> Collections + final Function> raGenerator = segment -> Collections .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( req, segmentsWithOvershadowedStatus::iterator, raGenerator, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 29a1fcf2e574..18d288eb6915 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -44,7 +44,7 @@ import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.DataSegmentWithOvershadowedStatus; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -80,7 +80,7 @@ public class MetadataSegmentView * from other threads. */ @MonotonicNonNull - private volatile ImmutableSortedSet publishedSegments = null; + private volatile ImmutableSortedSet publishedSegments = null; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -139,18 +139,18 @@ public void stop() private void poll() { log.info("polling published segments from coordinator"); - final JsonParserIterator metadataSegments = getMetadataSegments( + final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, responseHandler, segmentWatcherConfig.getWatchedDataSources() ); - final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder(); + final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder(); while (metadataSegments.hasNext()) { - final DataSegmentWithOvershadowedStatus segment = metadataSegments.next(); + final SegmentWithOvershadowedStatus segment = metadataSegments.next(); final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment()); - final DataSegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new DataSegmentWithOvershadowedStatus( + final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new SegmentWithOvershadowedStatus( interned, segment.isOvershadowed() ); @@ -160,7 +160,7 @@ private void poll() cachePopulated.countDown(); } - public Iterator getPublishedSegments() + public Iterator getPublishedSegments() { if (isCacheEnabled) { Uninterruptibles.awaitUninterruptibly(cachePopulated); @@ -176,7 +176,7 @@ public Iterator getPublishedSegments() } // Note that coordinator must be up to get segments - private JsonParserIterator getMetadataSegments( + private JsonParserIterator getMetadataSegments( DruidLeaderClient coordinatorClient, ObjectMapper jsonMapper, BytesAccumulatingResponseHandler responseHandler, @@ -210,7 +210,7 @@ private JsonParserIterator getMetadataSegment responseHandler ); - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() + final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() { }); return new JsonParserIterator<>( diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 57df5ccf29b1..87ff8a9fcdb4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -73,8 +73,8 @@ import org.apache.druid.sql.calcite.planner.PlannerContext; import org.apache.druid.sql.calcite.table.RowSignature; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.DataSegmentWithOvershadowedStatus; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.jboss.netty.handler.codec.http.HttpMethod; import javax.annotation.Nullable; @@ -100,7 +100,7 @@ public class SystemSchema extends AbstractSchema private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String TASKS_TABLE = "tasks"; - private static final Function> + private static final Function> SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR = segment -> Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply( segment.getDataSegment().getDataSource()) @@ -269,7 +269,7 @@ public Enumerable scan(DataContext root) } //get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator - final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); + final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = new HashSet<>(); @@ -354,15 +354,15 @@ public Enumerable scan(DataContext root) } - private Iterator getAuthorizedPublishedSegments( - Iterator it, + private Iterator getAuthorizedPublishedSegments( + Iterator it, DataContext root ) { final AuthenticationResult authenticationResult = (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - final Iterable authorizedSegments = AuthorizationUtils + final Iterable authorizedSegments = AuthorizationUtils .filterAuthorizedResources( authenticationResult, () -> it, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index a93552c9efce..49e406b0e6eb 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -83,8 +83,8 @@ import org.apache.druid.sql.calcite.util.TestServerInventoryView; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.DataSegmentWithOvershadowedStatus; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentWithOvershadowedStatus; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -490,12 +490,12 @@ public void testSegmentsTable() .withConstructor(druidSchema, metadataView, mapper, authMapper) .createMock(); EasyMock.replay(segmentsTable); - final Set publishedSegments = Stream.of( - new DataSegmentWithOvershadowedStatus(publishedSegment1, true), - new DataSegmentWithOvershadowedStatus(publishedSegment2, false), - new DataSegmentWithOvershadowedStatus(publishedSegment3, false), - new DataSegmentWithOvershadowedStatus(segment1, true), - new DataSegmentWithOvershadowedStatus(segment2, false) + final Set publishedSegments = Stream.of( + new SegmentWithOvershadowedStatus(publishedSegment1, true), + new SegmentWithOvershadowedStatus(publishedSegment2, false), + new SegmentWithOvershadowedStatus(publishedSegment3, false), + new SegmentWithOvershadowedStatus(segment1, true), + new SegmentWithOvershadowedStatus(segment2, false) ).collect(Collectors.toSet()); EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once(); From 8d2879eab9ecc39a4534b649255a2288a2120122 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 28 May 2019 17:21:47 -0700 Subject: [PATCH 06/18] PR comments --- .../SegmentWithOvershadowedStatus.java | 7 ++ .../druid/client/DataSourceSnapshot.java | 90 +++++++++++++++++++ .../metadata/MetadataSegmentManager.java | 5 +- .../metadata/SQLMetadataSegmentManager.java | 68 +++++--------- .../helper/DruidCoordinatorRuleRunner.java | 8 +- .../druid/server/http/MetadataResource.java | 6 +- .../DruidCoordinatorRuleRunnerTest.java | 2 +- 7 files changed, 129 insertions(+), 57 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/client/DataSourceSnapshot.java diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java index 7074e1dc0c7e..7a24ed307d70 100644 --- a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java +++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonUnwrapped; /** * DataSegment object plus the overshadowed status for the segment. An immutable object. @@ -31,6 +32,12 @@ public class SegmentWithOvershadowedStatus implements Comparable { private final boolean overshadowed; + /** + * dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of + * enclosing class. If in future, if {@Code SegmentWithOvershadowedStatus} were to extend {@link DataSegment}, + * there will be no change in the serialized format. + */ + @JsonUnwrapped private final DataSegment dataSegment; @JsonCreator diff --git a/server/src/main/java/org/apache/druid/client/DataSourceSnapshot.java b/server/src/main/java/org/apache/druid/client/DataSourceSnapshot.java new file mode 100644 index 000000000000..586507dcee72 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/DataSourceSnapshot.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Ordering; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.VersionedIntervalTimeline; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * An immutable snapshot of fields from {@link org.apache.druid.metadata.SQLMetadataSegmentManager} (dataSources and + * overshadowedSegments). Getters of {@link org.apache.druid.metadata.MetadataSegmentManager} should use this snapshot + * to return dataSources and overshadowedSegments. + */ +public class DataSourceSnapshot +{ + private final Collection dataSources; + + public DataSourceSnapshot( + Collection dataSources + ) + { + this.dataSources = dataSources; + } + + public Collection getDataSources() + { + return dataSources; + } + + public ImmutableSet getOvershadowedSegments() + { + return ImmutableSet.copyOf(determineOvershadowedSegments()); + } + + /** + * This method builds timelines from all dataSources and finds the overshadowed segments list + * + * @return overshadowed segment Ids list + */ + private List determineOvershadowedSegments() + { + + final List segments = dataSources.stream() + .flatMap(ds -> ds.getSegments().stream()) + .collect(Collectors.toList()); + final Map> timelines = new HashMap<>(); + segments.forEach(segment -> timelines + .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) + .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); + + // It's fine to add all overshadowed segments to a single collection because only + // a small fraction of the segments in the cluster are expected to be overshadowed, + // so building this collection shouldn't generate a lot of garbage. + final List overshadowedSegments = new ArrayList<>(); + for (DataSegment dataSegment : segments) { + final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); + if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { + overshadowedSegments.add(dataSegment.getId()); + } + } + return overshadowedSegments; + } + +} diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java index be2c5326d25f..8ad762e82e04 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import java.util.Collection; import java.util.List; +import java.util.Set; /** */ @@ -99,13 +100,13 @@ public interface MetadataSegmentManager Collection getAllDataSourceNames(); /** - * Returns a collection of overshadowed segments + * Returns a set of overshadowed segment Ids * * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has * not yet been polled.) */ @Nullable - Collection getOvershadowedSegments(); + Set getOvershadowedSegments(); /** * Returns top N unused segment intervals in given interval when ordered by segment start time, end time. diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index 9dd964d794ce..bc6c8be07a75 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -22,10 +22,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; import com.google.inject.Inject; +import org.apache.druid.client.DataSourceSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.guice.ManageLifecycle; @@ -62,7 +61,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -113,7 +111,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @Nullable private volatile ConcurrentHashMap dataSources = null; @Nullable - private volatile ImmutableSet overshadowedSegments = null; + private volatile DataSourceSnapshot dataSourcesSnapshot = null; /** * The number of times this SQLMetadataSegmentManager was started. @@ -214,7 +212,6 @@ public void stop() } dataSources = null; - overshadowedSegments = null; currentStartOrder = -1; exec.shutdownNow(); exec = null; @@ -456,6 +453,8 @@ public boolean removeDataSource(final String dataSource) ).bind("dataSource", dataSource).execute() ); + Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource)); + if (removed == 0) { return false; } @@ -620,35 +619,27 @@ public ImmutableDruidDataSource getDataSource(String dataSourceName) @Nullable public Collection getDataSources() { - return Optional.ofNullable(dataSources) - .map(m -> - m.values() - .stream() - .map(DruidDataSource::toImmutableDruidDataSource) - .collect(Collectors.toList()) - ) - .orElse(null); + return dataSourcesSnapshot.getDataSources(); } @Override @Nullable public Iterable iterateAllSegments() { - final ConcurrentHashMap dataSourcesSnapshot = dataSources; - if (dataSourcesSnapshot == null) { + final Collection snapshot = dataSourcesSnapshot.getDataSources(); + if (snapshot == null) { return null; } - return () -> dataSourcesSnapshot.values() - .stream() + return () -> snapshot.stream() .flatMap(dataSource -> dataSource.getSegments().stream()) .iterator(); } @Override - public Set getOvershadowedSegments() + public Set getOvershadowedSegments() { - return overshadowedSegments; + return dataSourcesSnapshot.getOvershadowedSegments(); } @Override @@ -755,35 +746,18 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE // Replace "dataSources" atomically. dataSources = newDataSources; - // the overshadowedSegments could become invalid if dataSources is updated outside of - // this method, currently both dataSources and overshadowedSegments are updated here - // and in stop() - overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments(segments)); - } - /** - * This method builds timelines from given segments and finds the overshadowed segments list - * - * @return overshadowed segments list - */ - private List determineOvershadowedSegments(Iterable segments) - { - final Map> timelines = new HashMap<>(); - segments.forEach(segment -> timelines - .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) - .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); - - // It's fine to add all overshadowed segments to a single collection because only - // a small fraction of the segments in the cluster are expected to be overshadowed, - // so building this collection shouldn't generate a lot of garbage. - final List overshadowedSegments = new ArrayList<>(); - for (DataSegment dataSegment : segments) { - final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); - if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { - overshadowedSegments.add(dataSegment); - } - } - return overshadowedSegments; + // replace "dataSourcesSnapshot" atomically + dataSourcesSnapshot = new DataSourceSnapshot( + Optional.ofNullable(dataSources) + .map(m -> + m.values() + .stream() + .map(DruidDataSource::toImmutableDruidDataSource) + .collect(Collectors.toList()) + ) + .orElse(null) + ); } /** diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index 5618b78477d2..2fbe84061608 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -33,10 +33,10 @@ import org.apache.druid.timeline.SegmentId; import org.joda.time.DateTime; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; /** */ @@ -86,9 +86,9 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // only those would need to be loaded/dropped // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed // If metadata store hasn't been polled yet, use empty overshadowed list - final Collection overshadowed = Optional + final Set overshadowed = Optional .ofNullable(coordinator.getMetadataSegmentManager().getOvershadowedSegments()) - .orElse(Collections.emptyList()); + .orElse(Collections.emptySet()); for (String tier : cluster.getTierNames()) { replicatorThrottler.updateReplicationState(tier); @@ -106,7 +106,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final List segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES); int missingRules = 0; for (DataSegment segment : params.getAvailableSegments()) { - if (overshadowed.contains(segment)) { + if (overshadowed.contains(segment.getId())) { // Skipping overshadowed segments continue; } diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 279bb7a40642..556ed3d8dd49 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -192,14 +192,14 @@ private Iterable findAuthorizedSegmentWithOversha ) { // If metadata store hasn't been polled yet, use empty overshadowed list - final Collection overshadowedSegments = Optional + final Set overshadowedSegments = Optional .ofNullable(metadataSegmentManager.getOvershadowedSegments()) - .orElse(Collections.emptyList()); + .orElse(Collections.emptySet()); final Stream segmentsWithOvershadowedStatus = metadataSegments .map(segment -> new SegmentWithOvershadowedStatus( segment, - overshadowedSegments.contains(segment) + overshadowedSegments.contains(segment.getId()) )); final Function> raGenerator = segment -> Collections diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 4d60bfb15571..3316f46cdce1 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -1341,7 +1341,7 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() availableSegments.add(v2); EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); - EasyMock.expect(databaseSegmentManager.getOvershadowedSegments()).andReturn(ImmutableSet.of(v1)).anyTimes(); + EasyMock.expect(databaseSegmentManager.getOvershadowedSegments()).andReturn(ImmutableSet.of(v1.getId())).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); EasyMock.replay(coordinator, databaseSegmentManager); mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.anyObject()); From a8af2187cb5030d363b6f55c43586df039b60f2a Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 28 May 2019 17:32:58 -0700 Subject: [PATCH 07/18] fix indentation --- .../main/java/org/apache/druid/client/DataSourceSnapshot.java | 1 - .../org/apache/druid/metadata/SQLMetadataSegmentManager.java | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DataSourceSnapshot.java b/server/src/main/java/org/apache/druid/client/DataSourceSnapshot.java index 586507dcee72..51d041b56849 100644 --- a/server/src/main/java/org/apache/druid/client/DataSourceSnapshot.java +++ b/server/src/main/java/org/apache/druid/client/DataSourceSnapshot.java @@ -65,7 +65,6 @@ public ImmutableSet getOvershadowedSegments() */ private List determineOvershadowedSegments() { - final List segments = dataSources.stream() .flatMap(ds -> ds.getSegments().stream()) .collect(Collectors.toList()); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index bc6c8be07a75..12f18e8362ae 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -632,8 +632,8 @@ public Iterable iterateAllSegments() } return () -> snapshot.stream() - .flatMap(dataSource -> dataSource.getSegments().stream()) - .iterator(); + .flatMap(dataSource -> dataSource.getSegments().stream()) + .iterator(); } @Override From 90e445428554734f006f5b895b1bad13543a86ee Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 28 May 2019 23:52:03 -0700 Subject: [PATCH 08/18] fix tests --- .../SegmentWithOvershadowedStatus.java | 2 +- ...Snapshot.java => DataSourcesSnapshot.java} | 8 +++--- .../metadata/SQLMetadataSegmentManager.java | 25 +++++++++++-------- 3 files changed, 20 insertions(+), 15 deletions(-) rename server/src/main/java/org/apache/druid/client/{DataSourceSnapshot.java => DataSourcesSnapshot.java} (93%) diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java index 7a24ed307d70..3f2972fd07e9 100644 --- a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java +++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java @@ -26,7 +26,7 @@ /** * DataSegment object plus the overshadowed status for the segment. An immutable object. * - * DataSegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId} + * SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId} * of the DataSegment object. */ public class SegmentWithOvershadowedStatus implements Comparable diff --git a/server/src/main/java/org/apache/druid/client/DataSourceSnapshot.java b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java similarity index 93% rename from server/src/main/java/org/apache/druid/client/DataSourceSnapshot.java rename to server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java index 51d041b56849..26028f5cc47d 100644 --- a/server/src/main/java/org/apache/druid/client/DataSourceSnapshot.java +++ b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java @@ -37,15 +37,17 @@ * overshadowedSegments). Getters of {@link org.apache.druid.metadata.MetadataSegmentManager} should use this snapshot * to return dataSources and overshadowedSegments. */ -public class DataSourceSnapshot +public class DataSourcesSnapshot { private final Collection dataSources; + private final ImmutableSet overshadowedSegments; - public DataSourceSnapshot( + public DataSourcesSnapshot( Collection dataSources ) { this.dataSources = dataSources; + this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments()); } public Collection getDataSources() @@ -55,7 +57,7 @@ public Collection getDataSources() public ImmutableSet getOvershadowedSegments() { - return ImmutableSet.copyOf(determineOvershadowedSegments()); + return overshadowedSegments; } /** diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index 12f18e8362ae..fd2dc8f6cd50 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.inject.Inject; -import org.apache.druid.client.DataSourceSnapshot; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.guice.ManageLifecycle; @@ -111,7 +111,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager @Nullable private volatile ConcurrentHashMap dataSources = null; @Nullable - private volatile DataSourceSnapshot dataSourcesSnapshot = null; + private volatile DataSourcesSnapshot dataSourcesSnapshot = null; /** * The number of times this SQLMetadataSegmentManager was started. @@ -210,7 +210,7 @@ public void stop() if (!isStarted()) { return; } - + dataSourcesSnapshot = null; dataSources = null; currentStartOrder = -1; exec.shutdownNow(); @@ -619,27 +619,30 @@ public ImmutableDruidDataSource getDataSource(String dataSourceName) @Nullable public Collection getDataSources() { - return dataSourcesSnapshot.getDataSources(); + return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getDataSources()).orElse(null); } @Override @Nullable public Iterable iterateAllSegments() { - final Collection snapshot = dataSourcesSnapshot.getDataSources(); - if (snapshot == null) { + final Collection dataSources = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSources()) + .orElse(null); + if (dataSources == null) { return null; } - return () -> snapshot.stream() - .flatMap(dataSource -> dataSource.getSegments().stream()) - .iterator(); + return () -> dataSources.stream() + .flatMap(dataSource -> dataSource.getSegments().stream()) + .iterator(); } @Override + @Nullable public Set getOvershadowedSegments() { - return dataSourcesSnapshot.getOvershadowedSegments(); + return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getOvershadowedSegments()).orElse(null); } @Override @@ -748,7 +751,7 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE dataSources = newDataSources; // replace "dataSourcesSnapshot" atomically - dataSourcesSnapshot = new DataSourceSnapshot( + dataSourcesSnapshot = new DataSourcesSnapshot( Optional.ofNullable(dataSources) .map(m -> m.values() From 2bb7d796e2b3766c8e576075ef3e5245b2075c3a Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 29 May 2019 10:50:19 -0700 Subject: [PATCH 09/18] fix test --- .../test/resources/results/auth_test_sys_schema_segments.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json index f2046dedf3a6..4437e725e28b 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json @@ -13,6 +13,6 @@ "is_available": 1, "is_realtime": 0, "is_overshadowed": 0, - "payload": "{\"dataSegment\":{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"},\"overshadowed\":false}" + "payload": "{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",\"overshadowed\":false}" } ] From 1e2f28eace6e2792ff47735e49903afbecf47c26 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 29 May 2019 12:36:50 -0700 Subject: [PATCH 10/18] add test for SegmentWithOvershadowedStatus serde format --- .../SegmentWithOvershadowedStatusTest.java | 180 ++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java diff --git a/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java new file mode 100644 index 000000000000..40b22a48dd0b --- /dev/null +++ b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.TestObjectMapper; +import org.apache.druid.jackson.CommaListJoinDeserializer; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.timeline.partition.NoneShardSpec; +import org.apache.druid.timeline.partition.ShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class SegmentWithOvershadowedStatusTest +{ + private static final ObjectMapper mapper = new TestObjectMapper(); + private static final int TEST_VERSION = 0x9; + + @Before + public void setUp() + { + InjectableValues.Std injectableValues = new InjectableValues.Std(); + injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); + mapper.setInjectableValues(injectableValues); + } + + @Test + public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws Exception + { + final Interval interval = Intervals.of("2011-10-01/2011-10-02"); + final ImmutableMap loadSpec = ImmutableMap.of("something", "or_other"); + + final DataSegment dataSegment = new DataSegment( + "something", + interval, + "1", + loadSpec, + Arrays.asList("dim1", "dim2"), + Arrays.asList("met1", "met2"), + NoneShardSpec.instance(), + TEST_VERSION, + 1 + ); + + final SegmentWithOvershadowedStatus segment = new SegmentWithOvershadowedStatus(dataSegment, false); + + final Map objectMap = mapper.readValue( + mapper.writeValueAsString(segment), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + Assert.assertEquals(11, objectMap.size()); + Assert.assertEquals("something", objectMap.get("dataSource")); + Assert.assertEquals(interval.toString(), objectMap.get("interval")); + Assert.assertEquals("1", objectMap.get("version")); + Assert.assertEquals(loadSpec, objectMap.get("loadSpec")); + Assert.assertEquals("dim1,dim2", objectMap.get("dimensions")); + Assert.assertEquals("met1,met2", objectMap.get("metrics")); + Assert.assertEquals(ImmutableMap.of("type", "none"), objectMap.get("shardSpec")); + Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion")); + Assert.assertEquals(1, objectMap.get("size")); + Assert.assertEquals(false, objectMap.get("overshadowed")); + + final String json = mapper.writeValueAsString(segment); + + final TestSegmentWithOvershadowedStatus deserializedSegment = mapper.readValue( + json, + TestSegmentWithOvershadowedStatus.class + ); + + Assert.assertEquals(segment.getDataSegment().getDataSource(), deserializedSegment.getDataSource()); + Assert.assertEquals(segment.getDataSegment().getInterval(), deserializedSegment.getInterval()); + Assert.assertEquals(segment.getDataSegment().getVersion(), deserializedSegment.getVersion()); + Assert.assertEquals(segment.getDataSegment().getLoadSpec(), deserializedSegment.getLoadSpec()); + Assert.assertEquals(segment.getDataSegment().getDimensions(), deserializedSegment.getDimensions()); + Assert.assertEquals(segment.getDataSegment().getMetrics(), deserializedSegment.getMetrics()); + Assert.assertEquals(segment.getDataSegment().getShardSpec(), deserializedSegment.getShardSpec()); + Assert.assertEquals(segment.getDataSegment().getSize(), deserializedSegment.getSize()); + Assert.assertEquals(segment.getDataSegment().getId(), deserializedSegment.getId()); + + } +} + +/** + * Subclass of DataSegment with overshadowed status + */ +class TestSegmentWithOvershadowedStatus extends DataSegment +{ + private final boolean overshadowed; + + @JsonCreator + public TestSegmentWithOvershadowedStatus( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("version") String version, + @JsonProperty("loadSpec") @Nullable Map loadSpec, + @JsonProperty("dimensions") + @JsonDeserialize(using = CommaListJoinDeserializer.class) + @Nullable + List dimensions, + @JsonProperty("metrics") + @JsonDeserialize(using = CommaListJoinDeserializer.class) + @Nullable + List metrics, + @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec, + @JsonProperty("binaryVersion") Integer binaryVersion, + @JsonProperty("size") long size, + @JsonProperty("overshadowed") boolean overshadowed + ) + { + super( + dataSource, + interval, + version, + loadSpec, + dimensions, + metrics, + shardSpec, + binaryVersion, + size + ); + this.overshadowed = overshadowed; + } + + @JsonProperty + public boolean isOvershadowed() + { + return overshadowed; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof TestSegmentWithOvershadowedStatus)) { + return false; + } + if (!super.equals(o)) { + return false; + } + final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o; + if (overshadowed != (that.overshadowed)) { + return false; + } + return true; + } + +} From bae854c89b68bdbc42c5139039e98611de80089b Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 29 May 2019 21:51:43 -0700 Subject: [PATCH 11/18] PR comments --- .../SegmentWithOvershadowedStatusTest.java | 2 +- .../druid/client/DataSourcesSnapshot.java | 29 ++++- .../metadata/MetadataSegmentManager.java | 8 ++ .../metadata/SQLMetadataSegmentManager.java | 115 ++++++++++-------- .../server/coordinator/DruidCoordinator.java | 25 +++- .../helper/DruidCoordinatorRuleRunner.java | 6 +- .../DruidCoordinatorRuleRunnerTest.java | 22 ++-- .../coordinator/DruidCoordinatorTest.java | 11 +- 8 files changed, 144 insertions(+), 74 deletions(-) diff --git a/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java index 40b22a48dd0b..050f9e04934b 100644 --- a/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java +++ b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java @@ -73,7 +73,7 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E ); final SegmentWithOvershadowedStatus segment = new SegmentWithOvershadowedStatus(dataSegment, false); - + final Map objectMap = mapper.readValue( mapper.writeValueAsString(segment), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT diff --git a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java index 26028f5cc47d..84176621e4b7 100644 --- a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java +++ b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java @@ -25,6 +25,7 @@ import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -39,11 +40,11 @@ */ public class DataSourcesSnapshot { - private final Collection dataSources; + private final Map dataSources; private final ImmutableSet overshadowedSegments; public DataSourcesSnapshot( - Collection dataSources + Map dataSources ) { this.dataSources = dataSources; @@ -51,15 +52,37 @@ public DataSourcesSnapshot( } public Collection getDataSources() + { + return dataSources.values(); + } + + public Map getDataSourcesMap() { return dataSources; } + @Nullable + public ImmutableDruidDataSource getDataSource(String dataSourceName) + { + return dataSources.get(dataSourceName); + } + public ImmutableSet getOvershadowedSegments() { return overshadowedSegments; } + @Nullable + public Iterable iterateAllSegmentsInSnapshot() + { + if (dataSources == null) { + return null; + } + return () -> dataSources.values().stream() + .flatMap(dataSource -> dataSource.getSegments().stream()) + .iterator(); + } + /** * This method builds timelines from all dataSources and finds the overshadowed segments list * @@ -67,7 +90,7 @@ public ImmutableSet getOvershadowedSegments() */ private List determineOvershadowedSegments() { - final List segments = dataSources.stream() + final List segments = dataSources.values().stream() .flatMap(ds -> ds.getSegments().stream()) .collect(Collectors.toList()); final Map> timelines = new HashMap<>(); diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java index 8ad762e82e04..e2703645708e 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java @@ -20,6 +20,7 @@ package org.apache.druid.metadata; import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -108,6 +109,13 @@ public interface MetadataSegmentManager @Nullable Set getOvershadowedSegments(); + /** + * Returns a snapshot of DruidDataSources and overshadowed segments + * + */ + @Nullable + DataSourcesSnapshot getDataSourcesSnapshot(); + /** * Returns top N unused segment intervals in given interval when ordered by segment start time, end time. */ diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index fd2dc8f6cd50..349f5f665e84 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -71,6 +71,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; import java.util.stream.Collectors; /** @@ -105,12 +106,9 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private final SQLMetadataConnector connector; // Volatile since this reference is reassigned in "poll" and then read from in other threads. - // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty map). // Note that this is not simply a lazy-initialized variable: it starts off as null, and may transition between // null and nonnull multiple times as stop() and start() are called. @Nullable - private volatile ConcurrentHashMap dataSources = null; - @Nullable private volatile DataSourcesSnapshot dataSourcesSnapshot = null; /** @@ -211,7 +209,6 @@ public void stop() return; } dataSourcesSnapshot = null; - dataSources = null; currentStartOrder = -1; exec.shutdownNow(); exec = null; @@ -453,7 +450,11 @@ public boolean removeDataSource(final String dataSource) ).bind("dataSource", dataSource).execute() ); - Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource)); + if (dataSourcesSnapshot != null) { + final Map dataSourcesMap = dataSourcesSnapshot.getDataSourcesMap(); + Optional.ofNullable(dataSourcesMap).ifPresent(m -> m.remove(dataSource)); + replaceDataSourcesSnapshot(dataSourcesMap); + } if (removed == 0) { return false; @@ -476,22 +477,22 @@ public boolean removeSegment(String dataSourceName, final String segmentId) // Call iteratePossibleParsingsWithDataSource() outside of dataSources.computeIfPresent() because the former is a // potentially expensive operation, while lambda to be passed into computeIfPresent() should preferably run fast. List possibleSegmentIds = SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId); - Optional.ofNullable(dataSources).ifPresent( - m -> - m.computeIfPresent( - dataSourceName, - (dsName, dataSource) -> { - for (SegmentId possibleSegmentId : possibleSegmentIds) { - if (dataSource.removeSegment(possibleSegmentId) != null) { - break; - } - } - // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. - //noinspection ReturnOfNull - return dataSource.isEmpty() ? null : dataSource; - } - ) - ); + if (dataSourcesSnapshot != null) { + final Map dataSourcesMap = dataSourcesSnapshot.getDataSourcesMap(); + BiFunction fn = (dsName, dataSource) -> { + for (SegmentId possibleSegmentId : possibleSegmentIds) { + DruidDataSource druidDataSource = new DruidDataSource(dataSource.getName(), dataSource.getProperties()); + if (druidDataSource.removeSegment(possibleSegmentId) != null) { + dataSource = druidDataSource.toImmutableDruidDataSource(); + break; + } + } + return dataSource; + }; + + Optional.ofNullable(dataSourcesMap).ifPresent(m -> m.computeIfPresent(dataSourceName, fn)); + replaceDataSourcesSnapshot(dataSourcesMap); + } return removed; } @@ -506,18 +507,18 @@ public boolean removeSegment(SegmentId segmentId) { try { final boolean removed = removeSegmentFromTable(segmentId.toString()); - Optional.ofNullable(dataSources).ifPresent( - m -> - m.computeIfPresent( - segmentId.getDataSource(), - (dsName, dataSource) -> { - dataSource.removeSegment(segmentId); - // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. - //noinspection ReturnOfNull - return dataSource.isEmpty() ? null : dataSource; - } - ) - ); + if (dataSourcesSnapshot != null) { + final Map dataSourcesMap = dataSourcesSnapshot.getDataSourcesMap(); + BiFunction fn = (dsName, dataSource) -> { + DruidDataSource druidDataSource = new DruidDataSource(dataSource.getName(), dataSource.getProperties()); + if (druidDataSource.removeSegment(segmentId) != null) { + dataSource = druidDataSource.toImmutableDruidDataSource(); + } + return dataSource; + }; + + Optional.ofNullable(dataSourcesMap).ifPresent(m -> m.computeIfPresent(segmentId.getDataSource(), fn)); + } return removed; } catch (Exception e) { @@ -611,8 +612,10 @@ public boolean isStarted() @Nullable public ImmutableDruidDataSource getDataSource(String dataSourceName) { - final DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(dataSourceName)).orElse(null); - return dataSource == null ? null : dataSource.toImmutableDruidDataSource(); + final ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSourcesMap().get(dataSourceName)) + .orElse(null); + return dataSource == null ? null : dataSource; } @Override @@ -645,6 +648,13 @@ public Set getOvershadowedSegments() return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getOvershadowedSegments()).orElse(null); } + @Nullable + @Override + public DataSourcesSnapshot getDataSourcesSnapshot() + { + return dataSourcesSnapshot; + } + @Override public Collection getAllDataSourceNames() { @@ -747,26 +757,27 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE .addSegmentIfAbsent(segment); }); - // Replace "dataSources" atomically. - dataSources = newDataSources; - - // replace "dataSourcesSnapshot" atomically - dataSourcesSnapshot = new DataSourcesSnapshot( - Optional.ofNullable(dataSources) - .map(m -> - m.values() - .stream() - .map(DruidDataSource::toImmutableDruidDataSource) - .collect(Collectors.toList()) - ) - .orElse(null) - ); + + replaceDataSourcesSnapshot(newDataSources.entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> e.getValue().toImmutableDruidDataSource() + ))); + } + + /** + * replace "dataSourcesSnapshot" atomically + */ + private void replaceDataSourcesSnapshot(Map dataSources) + { + dataSourcesSnapshot = new DataSourcesSnapshot(dataSources); } /** * For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough * (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all - * existing segments each time, and then replace them in {@link #dataSources}. This method allows to use already + * existing segments each time, and then replace them in {@link #dataSourcesSnapshot}. This method allows to use already * existing (old) segments when possible, effectively interning them a-la {@link String#intern} or {@link * com.google.common.collect.Interner}, aiming to make the majority of {@link DataSegment} objects garbage soon after * they are deserialized and to die in young generation. It allows to avoid fragmentation of the old generation and @@ -774,7 +785,9 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE */ private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment) { - DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(segment.getDataSource())).orElse(null); + ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSourcesMap().get(segment.getDataSource())) + .orElse(null); if (dataSource == null) { return segment; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index a1c3caad3ea9..9521f3bf5df7 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -29,6 +29,7 @@ import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; @@ -82,6 +83,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -145,6 +147,7 @@ public class DruidCoordinator private volatile boolean started = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; + private final DataSourcesSnapshot dataSourcesSnapshot; @Inject public DruidCoordinator( @@ -233,6 +236,7 @@ public DruidCoordinator( this.coordLeaderSelector = coordLeaderSelector; this.segmentCompactor = new DruidCoordinatorSegmentCompactor(indexingServiceClient); + this.dataSourcesSnapshot = metadataSegmentManager.getDataSourcesSnapshot(); } public boolean isLeader() @@ -245,9 +249,9 @@ public Map getLoadManagementPeons() return loadManagementPeons; } - public MetadataSegmentManager getMetadataSegmentManager() + public DataSourcesSnapshot getDataSourcesSnapshot() { - return metadataSegmentManager; + return metadataSegmentManager.getDataSourcesSnapshot(); } /** @@ -321,7 +325,9 @@ public Object2LongMap getSegmentAvailability() public Map getLoadStatus() { final Map loadStatus = new HashMap<>(); - final Collection dataSources = metadataSegmentManager.getDataSources(); + final Collection dataSources = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSources()) + .orElse(null); if (dataSources == null) { return loadStatus; @@ -398,7 +404,9 @@ public void moveSegment( throw new IAE("Cannot move [%s] to and from the same server [%s]", segmentId, fromServer.getName()); } - ImmutableDruidDataSource dataSource = metadataSegmentManager.getDataSource(segment.getDataSource()); + ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSource(segment.getDataSource())) + .orElse(null); if (dataSource == null) { throw new IAE("Unable to find dataSource for segment [%s] in metadata", segmentId); } @@ -488,7 +496,10 @@ public void moveSegment( @Nullable public Iterable iterateAvailableDataSegments() { - return metadataSegmentManager.iterateAllSegments(); + final Iterable dataSources = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.iterateAllSegmentsInSnapshot()) + .orElse(null); + return dataSources == null ? null : dataSources; } @LifecycleStart @@ -675,7 +686,9 @@ public void run() BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec); // Do coordinator stuff. - final Collection dataSources = metadataSegmentManager.getDataSources(); + final Collection dataSources = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSources()) + .orElse(null); if (dataSources == null) { log.info("Metadata store not polled yet, skipping this run."); return; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index 2fbe84061608..f1d69d93c1d2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator.helper; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -33,7 +34,6 @@ import org.apache.druid.timeline.SegmentId; import org.joda.time.DateTime; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; @@ -87,8 +87,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed // If metadata store hasn't been polled yet, use empty overshadowed list final Set overshadowed = Optional - .ofNullable(coordinator.getMetadataSegmentManager().getOvershadowedSegments()) - .orElse(Collections.emptySet()); + .ofNullable(coordinator.getDataSourcesSnapshot().getOvershadowedSegments()) + .orElse(ImmutableSet.of()); for (String tier : cluster.getTierNames()) { replicatorThrottler.updateReplicationState(tier); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 3316f46cdce1..61d4883cfd48 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -70,6 +71,7 @@ public class DruidCoordinatorRuleRunnerTest private ServiceEmitter emitter; private MetadataRuleManager databaseRuleManager; private MetadataSegmentManager databaseSegmentManager; + private DataSourcesSnapshot dataSourcesSnapshot; @Before public void setUp() @@ -80,6 +82,7 @@ public void setUp() EmittingLogger.registerEmitter(emitter); databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class); + dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class); DateTime start = DateTimes.of("2012-01-01"); availableSegments = new ArrayList<>(); @@ -561,7 +564,7 @@ public void testDropRemove() EasyMock.expectLastCall().atLeastOnce(); mockEmptyPeon(); - EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); + EasyMock.expect(coordinator.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); @@ -994,10 +997,10 @@ public void testDropServerActuallyServesSegment() @Test public void testReplicantThrottle() { - EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); - EasyMock.expect(databaseSegmentManager.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); + EasyMock.expect(coordinator.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); + EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); - EasyMock.replay(coordinator, databaseSegmentManager); + EasyMock.replay(coordinator, databaseSegmentManager, dataSourcesSnapshot); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); mockEmptyPeon(); @@ -1122,7 +1125,9 @@ public void testReplicantThrottleAcrossTiers() .build() ) .atLeastOnce(); - EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); + EasyMock.expect(coordinator.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); + EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); + EasyMock.replay(dataSourcesSnapshot); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); EasyMock.replay(coordinator); @@ -1340,9 +1345,10 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() availableSegments.add(v1); availableSegments.add(v2); - EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); - EasyMock.expect(databaseSegmentManager.getOvershadowedSegments()).andReturn(ImmutableSet.of(v1.getId())).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); + EasyMock.expect(coordinator.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); + EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); + EasyMock.replay(dataSourcesSnapshot); EasyMock.replay(coordinator, databaseSegmentManager); mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.anyObject()); EasyMock.expectLastCall().once(); @@ -1407,7 +1413,7 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() private void mockCoordinator() { - EasyMock.expect(coordinator.getMetadataSegmentManager()).andReturn(databaseSegmentManager).anyTimes(); + EasyMock.expect(coordinator.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 5ff127504957..1abc0f739e2f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -29,6 +29,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; @@ -99,6 +100,7 @@ public class DruidCoordinatorTest extends CuratorTestBase private ObjectMapper objectMapper; private DruidNode druidNode; private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter(); + private DataSourcesSnapshot dataSourcesSnapshot; @Before public void setUp() throws Exception @@ -106,8 +108,11 @@ public void setUp() throws Exception druidServer = EasyMock.createMock(DruidServer.class); serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class); databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class); + dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class); metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class); JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class); + EasyMock.expect(databaseSegmentManager.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot); + EasyMock.replay(databaseSegmentManager); EasyMock.expect( configManager.watch( EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY), @@ -245,8 +250,8 @@ public void testMoveSegment() ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class); EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(segment); EasyMock.replay(druidDataSource); - EasyMock.expect(databaseSegmentManager.getDataSource(EasyMock.anyString())).andReturn(druidDataSource); - EasyMock.replay(databaseSegmentManager); + EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource); + EasyMock.replay(dataSourcesSnapshot); scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); EasyMock.replay(scheduledExecutorFactory); EasyMock.replay(metadataRuleManager); @@ -530,6 +535,8 @@ public void testCoordinatorTieredRun() throws Exception private void setupMetadataSegmentManagerMock(DruidDataSource dataSource) { + EasyMock.expect(databaseSegmentManager.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot); + EasyMock.replay(databaseSegmentManager); EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes(); EasyMock .expect(databaseSegmentManager.iterateAllSegments()) From c6e88aa54ea0f71d292402be4ad845a87df4fda5 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 30 May 2019 22:26:12 -0700 Subject: [PATCH 12/18] PR comments --- .../server/coordinator/DruidCoordinator.java | 21 ++++++++++++------ .../DruidCoordinatorRuntimeParams.java | 22 +++++++++++++++++-- .../helper/DruidCoordinatorRuleRunner.java | 11 +++++++--- .../DruidCoordinatorRuleRunnerTest.java | 12 +++------- .../coordinator/DruidCoordinatorTest.java | 19 ++++++---------- 5 files changed, 52 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 9521f3bf5df7..fbcb9104aace 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; @@ -147,7 +148,10 @@ public class DruidCoordinator private volatile boolean started = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; - private final DataSourcesSnapshot dataSourcesSnapshot; + /** + * set in {@link CoordinatorRunnable#run()} at start of every coordinator run + */ + private volatile DataSourcesSnapshot dataSourcesSnapshot = null; @Inject public DruidCoordinator( @@ -236,7 +240,6 @@ public DruidCoordinator( this.coordLeaderSelector = coordLeaderSelector; this.segmentCompactor = new DruidCoordinatorSegmentCompactor(indexingServiceClient); - this.dataSourcesSnapshot = metadataSegmentManager.getDataSourcesSnapshot(); } public boolean isLeader() @@ -249,11 +252,6 @@ public Map getLoadManagementPeons() return loadManagementPeons; } - public DataSourcesSnapshot getDataSourcesSnapshot() - { - return metadataSegmentManager.getDataSourcesSnapshot(); - } - /** * @return tier -> { dataSource -> underReplicationCount } map */ @@ -384,6 +382,12 @@ public String getCurrentLeader() return coordLeaderSelector.getCurrentLeader(); } + @VisibleForTesting + void setDataSourcesSnapshotForTest(DataSourcesSnapshot snapshot) + { + dataSourcesSnapshot = snapshot; + } + public void moveSegment( ImmutableDruidServer fromServer, ImmutableDruidServer toServer, @@ -686,9 +690,11 @@ public void run() BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec); // Do coordinator stuff. + dataSourcesSnapshot = metadataSegmentManager.getDataSourcesSnapshot(); final Collection dataSources = Optional.ofNullable(dataSourcesSnapshot) .map(m -> m.getDataSources()) .orElse(null); + if (dataSources == null) { log.info("Metadata store not polled yet, skipping this run."); return; @@ -702,6 +708,7 @@ public void run() .withCompactionConfig(getCompactionConfig()) .withEmitter(emitter) .withBalancerStrategy(balancerStrategy) + .withDataSourcesSnapshot(dataSourcesSnapshot) .build(); for (DruidCoordinatorHelper helper : helpers) { // Don't read state and run state in the same helper otherwise racy conditions may exist diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index 655d6bdf22ff..de75bce9d9c2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -66,6 +67,7 @@ public static TreeSet createAvailableSegmentsSet(Iterable coordinatorDynamicConfig.getMillisToWaitBeforeDeleting()); @@ -237,6 +246,7 @@ public static class Builder private CoordinatorStats stats; private DateTime balancerReferenceTimestamp; private BalancerStrategy balancerStrategy; + private DataSourcesSnapshot dataSourcesSnapshot; Builder() { @@ -253,6 +263,7 @@ public static class Builder this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build(); this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty(); this.balancerReferenceTimestamp = DateTimes.nowUtc(); + this.dataSourcesSnapshot = null; } Builder( @@ -304,7 +315,8 @@ public DruidCoordinatorRuntimeParams build() coordinatorCompactionConfig, stats, balancerReferenceTimestamp, - balancerStrategy + balancerStrategy, + dataSourcesSnapshot ); } @@ -434,5 +446,11 @@ public Builder withBalancerStrategy(BalancerStrategy balancerStrategy) this.balancerStrategy = balancerStrategy; return this; } + + public Builder withDataSourcesSnapshot(DataSourcesSnapshot snapshot) + { + this.dataSourcesSnapshot = snapshot; + return this; + } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index f1d69d93c1d2..7570c81cbe13 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataRuleManager; @@ -86,9 +87,13 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // only those would need to be loaded/dropped // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed // If metadata store hasn't been polled yet, use empty overshadowed list - final Set overshadowed = Optional - .ofNullable(coordinator.getDataSourcesSnapshot().getOvershadowedSegments()) - .orElse(ImmutableSet.of()); + final DataSourcesSnapshot dataSourcesSnapshot = params.getDataSourcesSnapshot(); + Set overshadowed = ImmutableSet.of(); + if (dataSourcesSnapshot != null) { + overshadowed = Optional + .ofNullable(dataSourcesSnapshot.getOvershadowedSegments()) + .orElse(ImmutableSet.of()); + } for (String tier : cluster.getTierNames()) { replicatorThrottler.updateReplicationState(tier); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 61d4883cfd48..7d38fa09b8f2 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -564,7 +564,6 @@ public void testDropRemove() EasyMock.expectLastCall().atLeastOnce(); mockEmptyPeon(); - EasyMock.expect(coordinator.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); @@ -997,7 +996,6 @@ public void testDropServerActuallyServesSegment() @Test public void testReplicantThrottle() { - EasyMock.expect(coordinator.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); EasyMock.replay(coordinator, databaseSegmentManager, dataSourcesSnapshot); @@ -1125,7 +1123,6 @@ public void testReplicantThrottleAcrossTiers() .build() ) .atLeastOnce(); - EasyMock.expect(coordinator.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); EasyMock.replay(dataSourcesSnapshot); coordinator.removeSegment(EasyMock.anyObject()); @@ -1344,12 +1341,9 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() ); availableSegments.add(v1); availableSegments.add(v2); - + EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of(v1.getId())).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); - EasyMock.expect(coordinator.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); - EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); - EasyMock.replay(dataSourcesSnapshot); - EasyMock.replay(coordinator, databaseSegmentManager); + EasyMock.replay(coordinator, dataSourcesSnapshot); mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.anyObject()); EasyMock.expectLastCall().once(); mockEmptyPeon(); @@ -1393,6 +1387,7 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .withDataSourcesSnapshot(dataSourcesSnapshot) .build(); DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); @@ -1413,7 +1408,6 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() private void mockCoordinator() { - EasyMock.expect(coordinator.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 1abc0f739e2f..3df7fc655bc5 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -111,7 +111,8 @@ public void setUp() throws Exception dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class); metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class); JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class); - EasyMock.expect(databaseSegmentManager.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot); + EasyMock.expect(databaseSegmentManager.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); + EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes(); EasyMock.replay(databaseSegmentManager); EasyMock.expect( configManager.watch( @@ -250,7 +251,8 @@ public void testMoveSegment() ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class); EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(segment); EasyMock.replay(druidDataSource); - EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource); + coordinator.setDataSourcesSnapshotForTest(dataSourcesSnapshot); + EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes(); EasyMock.replay(dataSourcesSnapshot); scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); EasyMock.replay(scheduledExecutorFactory); @@ -535,22 +537,15 @@ public void testCoordinatorTieredRun() throws Exception private void setupMetadataSegmentManagerMock(DruidDataSource dataSource) { - EasyMock.expect(databaseSegmentManager.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot); - EasyMock.replay(databaseSegmentManager); - EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes(); EasyMock - .expect(databaseSegmentManager.iterateAllSegments()) + .expect(dataSourcesSnapshot.iterateAllSegmentsInSnapshot()) .andReturn(dataSource.getSegments()) .anyTimes(); EasyMock - .expect(databaseSegmentManager.getDataSources()) + .expect(dataSourcesSnapshot.getDataSources()) .andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource())) .anyTimes(); - EasyMock - .expect(databaseSegmentManager.getAllDataSourceNames()) - .andReturn(Collections.singleton(dataSource.getName())) - .anyTimes(); - EasyMock.replay(databaseSegmentManager); + EasyMock.replay(dataSourcesSnapshot); } @Nullable From 7361f83ed34b5228b092692eb93389a168d5ca67 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 31 May 2019 08:50:26 -0700 Subject: [PATCH 13/18] fix test --- .../server/coordinator/CuratorDruidCoordinatorTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index 53d6d34e7d39..a8faeacc8807 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -32,6 +32,7 @@ import org.apache.druid.client.BatchServerInventoryView; import org.apache.druid.client.CoordinatorSegmentWatcherConfig; import org.apache.druid.client.CoordinatorServerView; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.common.config.JacksonConfigManager; @@ -96,6 +97,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase private ObjectMapper objectMapper; private JacksonConfigManager configManager; private DruidNode druidNode; + private DataSourcesSnapshot dataSourcesSnapshot; private static final String SEGPATH = "/druid/segments"; private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1"; private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2"; @@ -127,6 +129,7 @@ public void setUp() throws Exception databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class); metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class); configManager = EasyMock.createNiceMock(JacksonConfigManager.class); + dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class); EasyMock.expect( configManager.watch( EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY), @@ -365,6 +368,9 @@ public void testMoveSegment() throws Exception EasyMock.expect(databaseSegmentManager.getDataSource(EasyMock.anyString())).andReturn(druidDataSource); EasyMock.replay(databaseSegmentManager); + coordinator.setDataSourcesSnapshotForTest(dataSourcesSnapshot); + EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes(); + EasyMock.replay(dataSourcesSnapshot); coordinator.moveSegment( source.toImmutableDruidServer(), dest.toImmutableDruidServer(), From fdd8e2cce5788945a0335abcb78be8dab4e7006b Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 3 Jun 2019 11:23:33 -0700 Subject: [PATCH 14/18] remove snapshot updates outside poll --- .../metadata/SQLMetadataSegmentManager.java | 78 +++++-------------- 1 file changed, 19 insertions(+), 59 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index 349f5f665e84..cda1b6364b0d 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -71,7 +71,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.BiFunction; import java.util.stream.Collectors; /** @@ -450,12 +449,6 @@ public boolean removeDataSource(final String dataSource) ).bind("dataSource", dataSource).execute() ); - if (dataSourcesSnapshot != null) { - final Map dataSourcesMap = dataSourcesSnapshot.getDataSourcesMap(); - Optional.ofNullable(dataSourcesMap).ifPresent(m -> m.remove(dataSource)); - replaceDataSourcesSnapshot(dataSourcesMap); - } - if (removed == 0) { return false; } @@ -472,29 +465,7 @@ public boolean removeDataSource(final String dataSource) public boolean removeSegment(String dataSourceName, final String segmentId) { try { - final boolean removed = removeSegmentFromTable(segmentId); - - // Call iteratePossibleParsingsWithDataSource() outside of dataSources.computeIfPresent() because the former is a - // potentially expensive operation, while lambda to be passed into computeIfPresent() should preferably run fast. - List possibleSegmentIds = SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId); - if (dataSourcesSnapshot != null) { - final Map dataSourcesMap = dataSourcesSnapshot.getDataSourcesMap(); - BiFunction fn = (dsName, dataSource) -> { - for (SegmentId possibleSegmentId : possibleSegmentIds) { - DruidDataSource druidDataSource = new DruidDataSource(dataSource.getName(), dataSource.getProperties()); - if (druidDataSource.removeSegment(possibleSegmentId) != null) { - dataSource = druidDataSource.toImmutableDruidDataSource(); - break; - } - } - return dataSource; - }; - - Optional.ofNullable(dataSourcesMap).ifPresent(m -> m.computeIfPresent(dataSourceName, fn)); - replaceDataSourcesSnapshot(dataSourcesMap); - } - - return removed; + return removeSegmentFromTable(segmentId); } catch (Exception e) { log.error(e, e.toString()); @@ -506,20 +477,7 @@ public boolean removeSegment(String dataSourceName, final String segmentId) public boolean removeSegment(SegmentId segmentId) { try { - final boolean removed = removeSegmentFromTable(segmentId.toString()); - if (dataSourcesSnapshot != null) { - final Map dataSourcesMap = dataSourcesSnapshot.getDataSourcesMap(); - BiFunction fn = (dsName, dataSource) -> { - DruidDataSource druidDataSource = new DruidDataSource(dataSource.getName(), dataSource.getProperties()); - if (druidDataSource.removeSegment(segmentId) != null) { - dataSource = druidDataSource.toImmutableDruidDataSource(); - } - return dataSource; - }; - - Optional.ofNullable(dataSourcesMap).ifPresent(m -> m.computeIfPresent(segmentId.getDataSource(), fn)); - } - return removed; + return removeSegmentFromTable(segmentId.toString()); } catch (Exception e) { log.error(e, e.toString()); @@ -757,21 +715,23 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE .addSegmentIfAbsent(segment); }); - - replaceDataSourcesSnapshot(newDataSources.entrySet() - .stream() - .collect(Collectors.toMap( - e -> e.getKey(), - e -> e.getValue().toImmutableDruidDataSource() - ))); - } - - /** - * replace "dataSourcesSnapshot" atomically - */ - private void replaceDataSourcesSnapshot(Map dataSources) - { - dataSourcesSnapshot = new DataSourcesSnapshot(dataSources); + /** + * dataSourcesSnapshot is updated only here, please note that if datasources or segments are enabled or disabled + * outside of poll, the dataSourcesSnapshot can become invalid until the next poll cycle. + * {@link DataSourcesSnapshot} computes the overshadowed segments, which makes it an expensive operation if the + * snapshot is invalidated on each segment removal, especially if a user issues a lot of single segment remove + * calls in rapid succession. So the snapshot update is not done outside of poll at this time. + * Updates outside of poll(), were primarily for the user experience, so users would immediately see the effect of + * a segment remove call reflected in MetadataResource API calls. These updates outside of schecduled poll may be + * added back in removeDataSource and removeSegment methods after the on-demand polling changes from + * https://github.com/apache/incubator-druid/pull/7653 are in. + */ + dataSourcesSnapshot = new DataSourcesSnapshot(newDataSources.entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> e.getValue().toImmutableDruidDataSource() + ))); } /** From 9090586378043f23df26ad140eea514c39ce505b Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 4 Jun 2019 12:09:16 -0700 Subject: [PATCH 15/18] PR comments --- .../metadata/SQLMetadataSegmentManager.java | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index cda1b6364b0d..276e0ef1098d 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -20,6 +20,7 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -461,18 +462,22 @@ public boolean removeDataSource(final String dataSource) return true; } + /** + * This method does not update {@code dataSourcesSnapshot}, see the comments in {@code doPoll()} about + * snapshot update. The segment removal will be reflected after next poll cyccle runs. + */ @Override - public boolean removeSegment(String dataSourceName, final String segmentId) + public boolean removeSegment(String dataSourceName, final String identifier) { - try { - return removeSegmentFromTable(segmentId); - } - catch (Exception e) { - log.error(e, e.toString()); - return false; - } + final SegmentId segmentId = SegmentId.tryParse(dataSourceName, identifier); + Preconditions.checkNotNull(segmentId); + return removeSegment(segmentId); } + /** + * This method does not update {@code dataSourcesSnapshot}, see the comments in {@code doPoll()} about + * snapshot update. The segment removal will be reflected after next poll cyccle runs. + */ @Override public boolean removeSegment(SegmentId segmentId) { @@ -715,17 +720,15 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE .addSegmentIfAbsent(segment); }); - /** - * dataSourcesSnapshot is updated only here, please note that if datasources or segments are enabled or disabled - * outside of poll, the dataSourcesSnapshot can become invalid until the next poll cycle. - * {@link DataSourcesSnapshot} computes the overshadowed segments, which makes it an expensive operation if the - * snapshot is invalidated on each segment removal, especially if a user issues a lot of single segment remove - * calls in rapid succession. So the snapshot update is not done outside of poll at this time. - * Updates outside of poll(), were primarily for the user experience, so users would immediately see the effect of - * a segment remove call reflected in MetadataResource API calls. These updates outside of schecduled poll may be - * added back in removeDataSource and removeSegment methods after the on-demand polling changes from - * https://github.com/apache/incubator-druid/pull/7653 are in. - */ + // dataSourcesSnapshot is updated only here, please note that if datasources or segments are enabled or disabled + // outside of poll, the dataSourcesSnapshot can become invalid until the next poll cycle. + // DataSourcesSnapshot computes the overshadowed segments, which makes it an expensive operation if the + // snapshot is invalidated on each segment removal, especially if a user issues a lot of single segment remove + // calls in rapid succession. So the snapshot update is not done outside of poll at this time. + // Updates outside of poll(), were primarily for the user experience, so users would immediately see the effect of + // a segment remove call reflected in MetadataResource API calls. These updates outside of scheduled poll may be + // added back in removeDataSource and removeSegment methods after the on-demand polling changes from + // https://github.com/apache/incubator-druid/pull/7653 are in. dataSourcesSnapshot = new DataSourcesSnapshot(newDataSources.entrySet() .stream() .collect(Collectors.toMap( From ed019c3b9da383a086b2e9b6cc80e6f916c3b4b0 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 5 Jun 2019 12:10:22 -0700 Subject: [PATCH 16/18] PR comments --- .../apache/druid/utils/CollectionUtils.java | 14 +++++++ .../MaterializedViewSupervisor.java | 2 +- .../actions/SegmentListActionsTest.java | 2 +- .../metadata/MetadataSegmentManager.java | 9 +---- .../metadata/SQLMetadataSegmentManager.java | 37 ++++++++----------- .../server/coordinator/DruidCoordinator.java | 2 +- .../server/http/DataSourcesResource.java | 2 +- .../SQLMetadataSegmentManagerTest.java | 2 +- 8 files changed, 37 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java index 0ba595c85a92..5863f015de7f 100644 --- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java @@ -21,8 +21,11 @@ import java.util.AbstractCollection; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.Spliterator; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Stream; @@ -68,6 +71,17 @@ public int size() }; } + /** + * Returns a transformed map from the given input map where the value is modified based on the given valueMapper + * function. + */ + public static Map transformValues(Map map, Function valueMapper) + { + final Map result = new HashMap<>(); + map.forEach((k, v) -> result.put(k, valueMapper.apply(v))); + return result; + } + private CollectionUtils() { } diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index c1033ae647d4..fb65b37fe2f5 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -384,7 +384,7 @@ Pair, Map>> checkSegment // drop derivative segments which interval equals the interval in toDeleteBaseSegments for (Interval interval : toDropInterval.keySet()) { for (DataSegment segment : derivativeSegments.get(interval)) { - segmentManager.removeSegment(segment.getId()); + segmentManager.removeSegment(segment.getId().toString()); } } // data of the latest interval will be built firstly. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java index 7d5c64c2c61c..fa30dde802b8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java @@ -73,7 +73,7 @@ public void setup() throws IOException expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval())); - expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId())); + expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId().toString())); } private DataSegment createSegment(Interval interval, String version) diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java index e2703645708e..db1fbef2499a 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java @@ -59,14 +59,9 @@ public interface MetadataSegmentManager boolean removeDataSource(String dataSource); /** - * Prefer {@link #removeSegment(SegmentId)} to this method when possible. - * - * This method is not removed because {@link org.apache.druid.server.http.DataSourcesResource#deleteDatasourceSegment} - * uses it and if it migrates to {@link #removeSegment(SegmentId)} the performance will be worse. + * Removes the given segmentId from metadata store. Returns true if one or more rows were affected. */ - boolean removeSegment(String dataSource, String segmentId); - - boolean removeSegment(SegmentId segmentId); + boolean removeSegment(String segmentId); long disableSegments(String dataSource, Collection segmentIds); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index 276e0ef1098d..2efd18dfd22a 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -20,7 +20,6 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -42,6 +41,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Duration; import org.joda.time.Interval; import org.skife.jdbi.v2.BaseResultSetMapper; @@ -467,22 +467,10 @@ public boolean removeDataSource(final String dataSource) * snapshot update. The segment removal will be reflected after next poll cyccle runs. */ @Override - public boolean removeSegment(String dataSourceName, final String identifier) - { - final SegmentId segmentId = SegmentId.tryParse(dataSourceName, identifier); - Preconditions.checkNotNull(segmentId); - return removeSegment(segmentId); - } - - /** - * This method does not update {@code dataSourcesSnapshot}, see the comments in {@code doPoll()} about - * snapshot update. The segment removal will be reflected after next poll cyccle runs. - */ - @Override - public boolean removeSegment(SegmentId segmentId) + public boolean removeSegment(String segmentId) { try { - return removeSegmentFromTable(segmentId.toString()); + return removeSegmentFromTable(segmentId); } catch (Exception e) { log.error(e, e.toString()); @@ -729,14 +717,21 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE // a segment remove call reflected in MetadataResource API calls. These updates outside of scheduled poll may be // added back in removeDataSource and removeSegment methods after the on-demand polling changes from // https://github.com/apache/incubator-druid/pull/7653 are in. - dataSourcesSnapshot = new DataSourcesSnapshot(newDataSources.entrySet() - .stream() - .collect(Collectors.toMap( - e -> e.getKey(), - e -> e.getValue().toImmutableDruidDataSource() - ))); + final Map updatedDataSources = CollectionUtils.transformValues( + newDataSources, + v -> v.toImmutableDruidDataSource() + ); + dataSourcesSnapshot = new DataSourcesSnapshot(updatedDataSources); } + /** + * But in this case, it's actually better to extract the first part as a method + * Map transformValues(Map map, Function valueMapper) in CollectionUtils + * because this boilerplate code appears in several places in the codebase + */ + + + /** * For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough * (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index fbcb9104aace..ba4296892a0b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -374,7 +374,7 @@ public CoordinatorCompactionConfig getCompactionConfig() public void removeSegment(DataSegment segment) { log.info("Removing Segment[%s]", segment.getId()); - metadataSegmentManager.removeSegment(segment.getId()); + metadataSegmentManager.removeSegment(segment.getId().toString()); } public String getCurrentLeader() diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 5904d52583bb..3787cc6b8149 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -434,7 +434,7 @@ public Response deleteDatasourceSegment( @PathParam("segmentId") String segmentId ) { - if (databaseSegmentManager.removeSegment(dataSourceName, segmentId)) { + if (databaseSegmentManager.removeSegment(segmentId)) { return Response.ok().build(); } return Response.noContent().build(); diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java index e7964481688a..a5d436b226e3 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java @@ -294,7 +294,7 @@ public void testRemoveDataSegment() throws IOException publisher.publishSegment(newSegment); Assert.assertNull(manager.getDataSource(newDataSource)); - Assert.assertTrue(manager.removeSegment(newSegment.getId())); + Assert.assertTrue(manager.removeSegment(newSegment.getId().toString())); } @Test From 1183f1f66b2ceb3621fd63989acad424f9205c91 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 6 Jun 2019 11:25:35 -0700 Subject: [PATCH 17/18] PR comments --- .../java/org/apache/druid/utils/CollectionUtils.java | 6 ++++-- .../druid/metadata/SQLMetadataSegmentManager.java | 12 +++--------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java index 5863f015de7f..447b849e6b14 100644 --- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java @@ -19,6 +19,8 @@ package org.apache.druid.utils; +import com.google.common.collect.Maps; + import java.util.AbstractCollection; import java.util.Collection; import java.util.HashMap; @@ -75,9 +77,9 @@ public int size() * Returns a transformed map from the given input map where the value is modified based on the given valueMapper * function. */ - public static Map transformValues(Map map, Function valueMapper) + public static Map mapValues(Map map, Function valueMapper) { - final Map result = new HashMap<>(); + final Map result = Maps.newHashMapWithExpectedSize(map.size()); map.forEach((k, v) -> result.put(k, valueMapper.apply(v))); return result; } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index 2efd18dfd22a..bb8514291df1 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -106,6 +106,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private final SQLMetadataConnector connector; // Volatile since this reference is reassigned in "poll" and then read from in other threads. + // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty dataSources map and + // empty overshadowedSegments set). // Note that this is not simply a lazy-initialized variable: it starts off as null, and may transition between // null and nonnull multiple times as stop() and start() are called. @Nullable @@ -717,21 +719,13 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE // a segment remove call reflected in MetadataResource API calls. These updates outside of scheduled poll may be // added back in removeDataSource and removeSegment methods after the on-demand polling changes from // https://github.com/apache/incubator-druid/pull/7653 are in. - final Map updatedDataSources = CollectionUtils.transformValues( + final Map updatedDataSources = CollectionUtils.mapValues( newDataSources, v -> v.toImmutableDruidDataSource() ); dataSourcesSnapshot = new DataSourcesSnapshot(updatedDataSources); } - /** - * But in this case, it's actually better to extract the first part as a method - * Map transformValues(Map map, Function valueMapper) in CollectionUtils - * because this boilerplate code appears in several places in the codebase - */ - - - /** * For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough * (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all From 59395fa520b8bd8bfb30262eeafdc8fdcc85b375 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 6 Jun 2019 11:34:02 -0700 Subject: [PATCH 18/18] removed unused import --- core/src/main/java/org/apache/druid/utils/CollectionUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java index 447b849e6b14..af3cf077f430 100644 --- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java @@ -23,7 +23,6 @@ import java.util.AbstractCollection; import java.util.Collection; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Spliterator;