From 295e5001dc3622edaa350121e3e0aad2f0c74cb1 Mon Sep 17 00:00:00 2001 From: Surekha Date: Fri, 7 Jun 2019 10:15:54 -0700 Subject: [PATCH] Optimize overshadowed segments computation (#7595) * Move the overshadowed segment computation to SQLMetadataSegmentManager's poll * rename method in MetadataSegmentManager * Fix tests * PR comments * PR comments * PR comments * fix indentation * fix tests * fix test * add test for SegmentWithOvershadowedStatus serde format * PR comments * PR comments * fix test * remove snapshot updates outside poll * PR comments * PR comments * PR comments * removed unused import --- .../SegmentWithOvershadowedStatus.java | 7 + .../apache/druid/utils/CollectionUtils.java | 19 +- .../SegmentWithOvershadowedStatusTest.java | 180 ++++++++++++++++++ .../MaterializedViewSupervisor.java | 2 +- .../actions/SegmentListActionsTest.java | 2 +- .../auth_test_sys_schema_segments.json | 2 +- .../druid/client/DataSourcesSnapshot.java | 114 +++++++++++ .../client/ImmutableDruidDataSource.java | 40 ---- .../metadata/MetadataSegmentManager.java | 27 ++- .../metadata/SQLMetadataSegmentManager.java | 132 ++++++------- .../server/coordinator/DruidCoordinator.java | 35 +++- .../DruidCoordinatorRuntimeParams.java | 22 ++- .../helper/DruidCoordinatorRuleRunner.java | 16 +- .../server/http/DataSourcesResource.java | 2 +- .../druid/server/http/MetadataResource.java | 25 +-- .../SQLMetadataSegmentManagerTest.java | 2 +- .../CuratorDruidCoordinatorTest.java | 6 + .../DruidCoordinatorRuleRunnerTest.java | 19 +- .../coordinator/DruidCoordinatorTest.java | 22 ++- .../sql/calcite/schema/SystemSchema.java | 13 +- 20 files changed, 514 insertions(+), 173 deletions(-) create mode 100644 core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java create mode 100644 server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java index e86daea7d86e..3f2972fd07e9 100644 --- a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java +++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonUnwrapped; /** * DataSegment object plus the overshadowed status for the segment. An immutable object. @@ -31,6 +32,12 @@ public class SegmentWithOvershadowedStatus implements Comparable { private final boolean overshadowed; + /** + * dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of + * enclosing class. If in future, if {@Code SegmentWithOvershadowedStatus} were to extend {@link DataSegment}, + * there will be no change in the serialized format. + */ + @JsonUnwrapped private final DataSegment dataSegment; @JsonCreator diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java index 6b4d3cc3df41..af3cf077f430 100644 --- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java @@ -19,10 +19,14 @@ package org.apache.druid.utils; +import com.google.common.collect.Maps; + import java.util.AbstractCollection; import java.util.Collection; import java.util.Iterator; +import java.util.Map; import java.util.Spliterator; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Stream; @@ -68,5 +72,18 @@ public int size() }; } - private CollectionUtils() {} + /** + * Returns a transformed map from the given input map where the value is modified based on the given valueMapper + * function. + */ + public static Map mapValues(Map map, Function valueMapper) + { + final Map result = Maps.newHashMapWithExpectedSize(map.size()); + map.forEach((k, v) -> result.put(k, valueMapper.apply(v))); + return result; + } + + private CollectionUtils() + { + } } diff --git a/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java new file mode 100644 index 000000000000..050f9e04934b --- /dev/null +++ b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.TestObjectMapper; +import org.apache.druid.jackson.CommaListJoinDeserializer; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.timeline.partition.NoneShardSpec; +import org.apache.druid.timeline.partition.ShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class SegmentWithOvershadowedStatusTest +{ + private static final ObjectMapper mapper = new TestObjectMapper(); + private static final int TEST_VERSION = 0x9; + + @Before + public void setUp() + { + InjectableValues.Std injectableValues = new InjectableValues.Std(); + injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT); + mapper.setInjectableValues(injectableValues); + } + + @Test + public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws Exception + { + final Interval interval = Intervals.of("2011-10-01/2011-10-02"); + final ImmutableMap loadSpec = ImmutableMap.of("something", "or_other"); + + final DataSegment dataSegment = new DataSegment( + "something", + interval, + "1", + loadSpec, + Arrays.asList("dim1", "dim2"), + Arrays.asList("met1", "met2"), + NoneShardSpec.instance(), + TEST_VERSION, + 1 + ); + + final SegmentWithOvershadowedStatus segment = new SegmentWithOvershadowedStatus(dataSegment, false); + + final Map objectMap = mapper.readValue( + mapper.writeValueAsString(segment), + JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT + ); + + Assert.assertEquals(11, objectMap.size()); + Assert.assertEquals("something", objectMap.get("dataSource")); + Assert.assertEquals(interval.toString(), objectMap.get("interval")); + Assert.assertEquals("1", objectMap.get("version")); + Assert.assertEquals(loadSpec, objectMap.get("loadSpec")); + Assert.assertEquals("dim1,dim2", objectMap.get("dimensions")); + Assert.assertEquals("met1,met2", objectMap.get("metrics")); + Assert.assertEquals(ImmutableMap.of("type", "none"), objectMap.get("shardSpec")); + Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion")); + Assert.assertEquals(1, objectMap.get("size")); + Assert.assertEquals(false, objectMap.get("overshadowed")); + + final String json = mapper.writeValueAsString(segment); + + final TestSegmentWithOvershadowedStatus deserializedSegment = mapper.readValue( + json, + TestSegmentWithOvershadowedStatus.class + ); + + Assert.assertEquals(segment.getDataSegment().getDataSource(), deserializedSegment.getDataSource()); + Assert.assertEquals(segment.getDataSegment().getInterval(), deserializedSegment.getInterval()); + Assert.assertEquals(segment.getDataSegment().getVersion(), deserializedSegment.getVersion()); + Assert.assertEquals(segment.getDataSegment().getLoadSpec(), deserializedSegment.getLoadSpec()); + Assert.assertEquals(segment.getDataSegment().getDimensions(), deserializedSegment.getDimensions()); + Assert.assertEquals(segment.getDataSegment().getMetrics(), deserializedSegment.getMetrics()); + Assert.assertEquals(segment.getDataSegment().getShardSpec(), deserializedSegment.getShardSpec()); + Assert.assertEquals(segment.getDataSegment().getSize(), deserializedSegment.getSize()); + Assert.assertEquals(segment.getDataSegment().getId(), deserializedSegment.getId()); + + } +} + +/** + * Subclass of DataSegment with overshadowed status + */ +class TestSegmentWithOvershadowedStatus extends DataSegment +{ + private final boolean overshadowed; + + @JsonCreator + public TestSegmentWithOvershadowedStatus( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("version") String version, + @JsonProperty("loadSpec") @Nullable Map loadSpec, + @JsonProperty("dimensions") + @JsonDeserialize(using = CommaListJoinDeserializer.class) + @Nullable + List dimensions, + @JsonProperty("metrics") + @JsonDeserialize(using = CommaListJoinDeserializer.class) + @Nullable + List metrics, + @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec, + @JsonProperty("binaryVersion") Integer binaryVersion, + @JsonProperty("size") long size, + @JsonProperty("overshadowed") boolean overshadowed + ) + { + super( + dataSource, + interval, + version, + loadSpec, + dimensions, + metrics, + shardSpec, + binaryVersion, + size + ); + this.overshadowed = overshadowed; + } + + @JsonProperty + public boolean isOvershadowed() + { + return overshadowed; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof TestSegmentWithOvershadowedStatus)) { + return false; + } + if (!super.equals(o)) { + return false; + } + final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o; + if (overshadowed != (that.overshadowed)) { + return false; + } + return true; + } + +} diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 105afdf8f23f..efbdcbe66153 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -365,7 +365,7 @@ Pair, Map>> checkSegment // drop derivative segments which interval equals the interval in toDeleteBaseSegments for (Interval interval : toDropInterval.keySet()) { for (DataSegment segment : derivativeSegments.get(interval)) { - segmentManager.removeSegment(segment.getId()); + segmentManager.removeSegment(segment.getId().toString()); } } // data of the latest interval will be built firstly. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java index 7d5c64c2c61c..fa30dde802b8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java @@ -73,7 +73,7 @@ public void setup() throws IOException expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval())); - expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId())); + expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId().toString())); } private DataSegment createSegment(Interval interval, String version) diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json index f2046dedf3a6..4437e725e28b 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json @@ -13,6 +13,6 @@ "is_available": 1, "is_realtime": 0, "is_overshadowed": 0, - "payload": "{\"dataSegment\":{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"},\"overshadowed\":false}" + "payload": "{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\",\"overshadowed\":false}" } ] diff --git a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java new file mode 100644 index 000000000000..84176621e4b7 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Ordering; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.VersionedIntervalTimeline; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * An immutable snapshot of fields from {@link org.apache.druid.metadata.SQLMetadataSegmentManager} (dataSources and + * overshadowedSegments). Getters of {@link org.apache.druid.metadata.MetadataSegmentManager} should use this snapshot + * to return dataSources and overshadowedSegments. + */ +public class DataSourcesSnapshot +{ + private final Map dataSources; + private final ImmutableSet overshadowedSegments; + + public DataSourcesSnapshot( + Map dataSources + ) + { + this.dataSources = dataSources; + this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments()); + } + + public Collection getDataSources() + { + return dataSources.values(); + } + + public Map getDataSourcesMap() + { + return dataSources; + } + + @Nullable + public ImmutableDruidDataSource getDataSource(String dataSourceName) + { + return dataSources.get(dataSourceName); + } + + public ImmutableSet getOvershadowedSegments() + { + return overshadowedSegments; + } + + @Nullable + public Iterable iterateAllSegmentsInSnapshot() + { + if (dataSources == null) { + return null; + } + return () -> dataSources.values().stream() + .flatMap(dataSource -> dataSource.getSegments().stream()) + .iterator(); + } + + /** + * This method builds timelines from all dataSources and finds the overshadowed segments list + * + * @return overshadowed segment Ids list + */ + private List determineOvershadowedSegments() + { + final List segments = dataSources.values().stream() + .flatMap(ds -> ds.getSegments().stream()) + .collect(Collectors.toList()); + final Map> timelines = new HashMap<>(); + segments.forEach(segment -> timelines + .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) + .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); + + // It's fine to add all overshadowed segments to a single collection because only + // a small fraction of the segments in the cluster are expected to be overshadowed, + // so building this collection shouldn't generate a lot of garbage. + final List overshadowedSegments = new ArrayList<>(); + for (DataSegment dataSegment : segments) { + final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); + if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { + overshadowedSegments.add(dataSegment.getId()); + } + } + return overshadowedSegments; + } + +} diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java index 841b7169458e..59539443b618 100644 --- a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java +++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java @@ -25,17 +25,12 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; -import com.google.common.collect.Ordering; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.VersionedIntervalTimeline; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Set; /** * An immutable collection of metadata of segments ({@link DataSegment} objects), belonging to a particular data source. @@ -114,41 +109,6 @@ public long getTotalSizeOfSegments() return totalSizeOfSegments; } - /** - * This method finds the overshadowed segments from the given segments - * - * @return set of overshadowed segments - */ - public static Set determineOvershadowedSegments(Iterable segments) - { - final Map> timelines = buildTimelines(segments); - - final Set overshadowedSegments = new HashSet<>(); - for (DataSegment dataSegment : segments) { - final VersionedIntervalTimeline timeline = timelines.get(dataSegment.getDataSource()); - if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) { - overshadowedSegments.add(dataSegment); - } - } - return overshadowedSegments; - } - - /** - * Builds a timeline from given segments - * - * @return map of datasource to VersionedIntervalTimeline of segments - */ - private static Map> buildTimelines( - Iterable segments - ) - { - final Map> timelines = new HashMap<>(); - segments.forEach(segment -> timelines - .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural())) - .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment))); - return timelines; - } - @Override public String toString() { diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java index 4fc517718f2a..db1fbef2499a 100644 --- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java @@ -20,6 +20,7 @@ package org.apache.druid.metadata; import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -28,6 +29,7 @@ import javax.annotation.Nullable; import java.util.Collection; import java.util.List; +import java.util.Set; /** */ @@ -57,14 +59,9 @@ public interface MetadataSegmentManager boolean removeDataSource(String dataSource); /** - * Prefer {@link #removeSegment(SegmentId)} to this method when possible. - * - * This method is not removed because {@link org.apache.druid.server.http.DataSourcesResource#deleteDatasourceSegment} - * uses it and if it migrates to {@link #removeSegment(SegmentId)} the performance will be worse. + * Removes the given segmentId from metadata store. Returns true if one or more rows were affected. */ - boolean removeSegment(String dataSource, String segmentId); - - boolean removeSegment(SegmentId segmentId); + boolean removeSegment(String segmentId); long disableSegments(String dataSource, Collection segmentIds); @@ -98,6 +95,22 @@ public interface MetadataSegmentManager Collection getAllDataSourceNames(); + /** + * Returns a set of overshadowed segment Ids + * + * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has + * not yet been polled.) + */ + @Nullable + Set getOvershadowedSegments(); + + /** + * Returns a snapshot of DruidDataSources and overshadowed segments + * + */ + @Nullable + DataSourcesSnapshot getDataSourcesSnapshot(); + /** * Returns top N unused segment intervals in given interval when ordered by segment start time, end time. */ diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index 35843c95b9dd..bb8514291df1 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.inject.Inject; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.guice.ManageLifecycle; @@ -40,6 +41,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Duration; import org.joda.time.Interval; import org.skife.jdbi.v2.BaseResultSetMapper; @@ -65,6 +67,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -103,11 +106,12 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager private final SQLMetadataConnector connector; // Volatile since this reference is reassigned in "poll" and then read from in other threads. - // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty map). + // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty dataSources map and + // empty overshadowedSegments set). // Note that this is not simply a lazy-initialized variable: it starts off as null, and may transition between // null and nonnull multiple times as stop() and start() are called. @Nullable - private volatile ConcurrentHashMap dataSources = null; + private volatile DataSourcesSnapshot dataSourcesSnapshot = null; /** * The number of times this SQLMetadataSegmentManager was started. @@ -206,8 +210,7 @@ public void stop() if (!isStarted()) { return; } - - dataSources = null; + dataSourcesSnapshot = null; currentStartOrder = -1; exec.shutdownNow(); exec = null; @@ -449,8 +452,6 @@ public boolean removeDataSource(final String dataSource) ).bind("dataSource", dataSource).execute() ); - Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource)); - if (removed == 0) { return false; } @@ -463,58 +464,15 @@ public boolean removeDataSource(final String dataSource) return true; } + /** + * This method does not update {@code dataSourcesSnapshot}, see the comments in {@code doPoll()} about + * snapshot update. The segment removal will be reflected after next poll cyccle runs. + */ @Override - public boolean removeSegment(String dataSourceName, final String segmentId) - { - try { - final boolean removed = removeSegmentFromTable(segmentId); - - // Call iteratePossibleParsingsWithDataSource() outside of dataSources.computeIfPresent() because the former is a - // potentially expensive operation, while lambda to be passed into computeIfPresent() should preferably run fast. - List possibleSegmentIds = SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId); - Optional.ofNullable(dataSources).ifPresent( - m -> - m.computeIfPresent( - dataSourceName, - (dsName, dataSource) -> { - for (SegmentId possibleSegmentId : possibleSegmentIds) { - if (dataSource.removeSegment(possibleSegmentId) != null) { - break; - } - } - // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. - //noinspection ReturnOfNull - return dataSource.isEmpty() ? null : dataSource; - } - ) - ); - - return removed; - } - catch (Exception e) { - log.error(e, e.toString()); - return false; - } - } - - @Override - public boolean removeSegment(SegmentId segmentId) + public boolean removeSegment(String segmentId) { try { - final boolean removed = removeSegmentFromTable(segmentId.toString()); - Optional.ofNullable(dataSources).ifPresent( - m -> - m.computeIfPresent( - segmentId.getDataSource(), - (dsName, dataSource) -> { - dataSource.removeSegment(segmentId); - // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry. - //noinspection ReturnOfNull - return dataSource.isEmpty() ? null : dataSource; - } - ) - ); - return removed; + return removeSegmentFromTable(segmentId); } catch (Exception e) { log.error(e, e.toString()); @@ -607,37 +565,47 @@ public boolean isStarted() @Nullable public ImmutableDruidDataSource getDataSource(String dataSourceName) { - final DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(dataSourceName)).orElse(null); - return dataSource == null ? null : dataSource.toImmutableDruidDataSource(); + final ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSourcesMap().get(dataSourceName)) + .orElse(null); + return dataSource == null ? null : dataSource; } @Override @Nullable public Collection getDataSources() { - return Optional.ofNullable(dataSources) - .map(m -> - m.values() - .stream() - .map(DruidDataSource::toImmutableDruidDataSource) - .collect(Collectors.toList()) - ) - .orElse(null); + return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getDataSources()).orElse(null); } @Override @Nullable public Iterable iterateAllSegments() { - final ConcurrentHashMap dataSourcesSnapshot = dataSources; - if (dataSourcesSnapshot == null) { + final Collection dataSources = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSources()) + .orElse(null); + if (dataSources == null) { return null; } - return () -> dataSourcesSnapshot.values() - .stream() - .flatMap(dataSource -> dataSource.getSegments().stream()) - .iterator(); + return () -> dataSources.stream() + .flatMap(dataSource -> dataSource.getSegments().stream()) + .iterator(); + } + + @Override + @Nullable + public Set getOvershadowedSegments() + { + return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getOvershadowedSegments()).orElse(null); + } + + @Nullable + @Override + public DataSourcesSnapshot getDataSourcesSnapshot() + { + return dataSourcesSnapshot; } @Override @@ -742,14 +710,26 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE .addSegmentIfAbsent(segment); }); - // Replace "dataSources" atomically. - dataSources = newDataSources; + // dataSourcesSnapshot is updated only here, please note that if datasources or segments are enabled or disabled + // outside of poll, the dataSourcesSnapshot can become invalid until the next poll cycle. + // DataSourcesSnapshot computes the overshadowed segments, which makes it an expensive operation if the + // snapshot is invalidated on each segment removal, especially if a user issues a lot of single segment remove + // calls in rapid succession. So the snapshot update is not done outside of poll at this time. + // Updates outside of poll(), were primarily for the user experience, so users would immediately see the effect of + // a segment remove call reflected in MetadataResource API calls. These updates outside of scheduled poll may be + // added back in removeDataSource and removeSegment methods after the on-demand polling changes from + // https://github.com/apache/incubator-druid/pull/7653 are in. + final Map updatedDataSources = CollectionUtils.mapValues( + newDataSources, + v -> v.toImmutableDruidDataSource() + ); + dataSourcesSnapshot = new DataSourcesSnapshot(updatedDataSources); } /** * For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough * (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all - * existing segments each time, and then replace them in {@link #dataSources}. This method allows to use already + * existing segments each time, and then replace them in {@link #dataSourcesSnapshot}. This method allows to use already * existing (old) segments when possible, effectively interning them a-la {@link String#intern} or {@link * com.google.common.collect.Interner}, aiming to make the majority of {@link DataSegment} objects garbage soon after * they are deserialized and to die in young generation. It allows to avoid fragmentation of the old generation and @@ -757,7 +737,9 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE */ private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment) { - DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(segment.getDataSource())).orElse(null); + ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSourcesMap().get(segment.getDataSource())) + .orElse(null); if (dataSource == null) { return segment; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 27bee2f0d225..ba4296892a0b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; @@ -29,6 +30,7 @@ import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; @@ -82,6 +84,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -145,6 +148,10 @@ public class DruidCoordinator private volatile boolean started = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; + /** + * set in {@link CoordinatorRunnable#run()} at start of every coordinator run + */ + private volatile DataSourcesSnapshot dataSourcesSnapshot = null; @Inject public DruidCoordinator( @@ -316,7 +323,9 @@ public Object2LongMap getSegmentAvailability() public Map getLoadStatus() { final Map loadStatus = new HashMap<>(); - final Collection dataSources = metadataSegmentManager.getDataSources(); + final Collection dataSources = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSources()) + .orElse(null); if (dataSources == null) { return loadStatus; @@ -365,7 +374,7 @@ public CoordinatorCompactionConfig getCompactionConfig() public void removeSegment(DataSegment segment) { log.info("Removing Segment[%s]", segment.getId()); - metadataSegmentManager.removeSegment(segment.getId()); + metadataSegmentManager.removeSegment(segment.getId().toString()); } public String getCurrentLeader() @@ -373,6 +382,12 @@ public String getCurrentLeader() return coordLeaderSelector.getCurrentLeader(); } + @VisibleForTesting + void setDataSourcesSnapshotForTest(DataSourcesSnapshot snapshot) + { + dataSourcesSnapshot = snapshot; + } + public void moveSegment( ImmutableDruidServer fromServer, ImmutableDruidServer toServer, @@ -393,7 +408,9 @@ public void moveSegment( throw new IAE("Cannot move [%s] to and from the same server [%s]", segmentId, fromServer.getName()); } - ImmutableDruidDataSource dataSource = metadataSegmentManager.getDataSource(segment.getDataSource()); + ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSource(segment.getDataSource())) + .orElse(null); if (dataSource == null) { throw new IAE("Unable to find dataSource for segment [%s] in metadata", segmentId); } @@ -483,7 +500,10 @@ public void moveSegment( @Nullable public Iterable iterateAvailableDataSegments() { - return metadataSegmentManager.iterateAllSegments(); + final Iterable dataSources = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.iterateAllSegmentsInSnapshot()) + .orElse(null); + return dataSources == null ? null : dataSources; } @LifecycleStart @@ -670,7 +690,11 @@ public void run() BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec); // Do coordinator stuff. - final Collection dataSources = metadataSegmentManager.getDataSources(); + dataSourcesSnapshot = metadataSegmentManager.getDataSourcesSnapshot(); + final Collection dataSources = Optional.ofNullable(dataSourcesSnapshot) + .map(m -> m.getDataSources()) + .orElse(null); + if (dataSources == null) { log.info("Metadata store not polled yet, skipping this run."); return; @@ -684,6 +708,7 @@ public void run() .withCompactionConfig(getCompactionConfig()) .withEmitter(emitter) .withBalancerStrategy(balancerStrategy) + .withDataSourcesSnapshot(dataSourcesSnapshot) .build(); for (DruidCoordinatorHelper helper : helpers) { // Don't read state and run state in the same helper otherwise racy conditions may exist diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index 655d6bdf22ff..de75bce9d9c2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -66,6 +67,7 @@ public static TreeSet createAvailableSegmentsSet(Iterable coordinatorDynamicConfig.getMillisToWaitBeforeDeleting()); @@ -237,6 +246,7 @@ public static class Builder private CoordinatorStats stats; private DateTime balancerReferenceTimestamp; private BalancerStrategy balancerStrategy; + private DataSourcesSnapshot dataSourcesSnapshot; Builder() { @@ -253,6 +263,7 @@ public static class Builder this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build(); this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty(); this.balancerReferenceTimestamp = DateTimes.nowUtc(); + this.dataSourcesSnapshot = null; } Builder( @@ -304,7 +315,8 @@ public DruidCoordinatorRuntimeParams build() coordinatorCompactionConfig, stats, balancerReferenceTimestamp, - balancerStrategy + balancerStrategy, + dataSourcesSnapshot ); } @@ -434,5 +446,11 @@ public Builder withBalancerStrategy(BalancerStrategy balancerStrategy) this.balancerStrategy = balancerStrategy; return this; } + + public Builder withDataSourcesSnapshot(DataSourcesSnapshot snapshot) + { + this.dataSourcesSnapshot = snapshot; + return this; + } } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java index bbceaafed273..7570c81cbe13 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java @@ -19,8 +19,9 @@ package org.apache.druid.server.coordinator.helper; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataRuleManager; @@ -35,6 +36,7 @@ import org.joda.time.DateTime; import java.util.List; +import java.util.Optional; import java.util.Set; /** @@ -84,8 +86,14 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // find available segments which are not overshadowed by other segments in DB // only those would need to be loaded/dropped // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed - final Set overshadowed = ImmutableDruidDataSource - .determineOvershadowedSegments(params.getAvailableSegments()); + // If metadata store hasn't been polled yet, use empty overshadowed list + final DataSourcesSnapshot dataSourcesSnapshot = params.getDataSourcesSnapshot(); + Set overshadowed = ImmutableSet.of(); + if (dataSourcesSnapshot != null) { + overshadowed = Optional + .ofNullable(dataSourcesSnapshot.getOvershadowedSegments()) + .orElse(ImmutableSet.of()); + } for (String tier : cluster.getTierNames()) { replicatorThrottler.updateReplicationState(tier); @@ -103,7 +111,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final List segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES); int missingRules = 0; for (DataSegment segment : params.getAvailableSegments()) { - if (overshadowed.contains(segment)) { + if (overshadowed.contains(segment.getId())) { // Skipping overshadowed segments continue; } diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index 5904d52583bb..3787cc6b8149 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -434,7 +434,7 @@ public Response deleteDatasourceSegment( @PathParam("segmentId") String segmentId ) { - if (databaseSegmentManager.removeSegment(dataSourceName, segmentId)) { + if (databaseSegmentManager.removeSegment(segmentId)) { return Response.ok().build(); } return Response.noContent().build(); diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 3c7e8ac37b83..556ed3d8dd49 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -52,7 +52,6 @@ import javax.ws.rs.core.Response; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -163,11 +162,11 @@ public Response getDatabaseSegments( final Stream metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream()); if (includeOvershadowedStatus != null) { - final Iterable authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus( - req, - druidDataSources, - metadataSegments - ); + final Iterable authorizedSegments = + findAuthorizedSegmentWithOvershadowedStatus( + req, + metadataSegments + ); Response.ResponseBuilder builder = Response.status(Response.Status.OK); return builder.entity(authorizedSegments).build(); } else { @@ -189,22 +188,18 @@ public Response getDatabaseSegments( private Iterable findAuthorizedSegmentWithOvershadowedStatus( HttpServletRequest req, - Collection druidDataSources, Stream metadataSegments ) { - // It's fine to add all overshadowed segments to a single collection because only - // a small fraction of the segments in the cluster are expected to be overshadowed, - // so building this collection shouldn't generate a lot of garbage. - final Set overshadowedSegments = new HashSet<>(); - for (ImmutableDruidDataSource dataSource : druidDataSources) { - overshadowedSegments.addAll(ImmutableDruidDataSource.determineOvershadowedSegments(dataSource.getSegments())); - } + // If metadata store hasn't been polled yet, use empty overshadowed list + final Set overshadowedSegments = Optional + .ofNullable(metadataSegmentManager.getOvershadowedSegments()) + .orElse(Collections.emptySet()); final Stream segmentsWithOvershadowedStatus = metadataSegments .map(segment -> new SegmentWithOvershadowedStatus( segment, - overshadowedSegments.contains(segment) + overshadowedSegments.contains(segment.getId()) )); final Function> raGenerator = segment -> Collections diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java index e7964481688a..a5d436b226e3 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java @@ -294,7 +294,7 @@ public void testRemoveDataSegment() throws IOException publisher.publishSegment(newSegment); Assert.assertNull(manager.getDataSource(newDataSource)); - Assert.assertTrue(manager.removeSegment(newSegment.getId())); + Assert.assertTrue(manager.removeSegment(newSegment.getId().toString())); } @Test diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java index 3b2223514f59..9cd02069e01e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -32,6 +32,7 @@ import org.apache.druid.client.BatchServerInventoryView; import org.apache.druid.client.CoordinatorSegmentWatcherConfig; import org.apache.druid.client.CoordinatorServerView; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.common.config.JacksonConfigManager; @@ -96,6 +97,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase private ObjectMapper objectMapper; private JacksonConfigManager configManager; private DruidNode druidNode; + private DataSourcesSnapshot dataSourcesSnapshot; private static final String SEGPATH = "/druid/segments"; private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1"; private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2"; @@ -127,6 +129,7 @@ public void setUp() throws Exception databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class); metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class); configManager = EasyMock.createNiceMock(JacksonConfigManager.class); + dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class); EasyMock.expect( configManager.watch( EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY), @@ -368,6 +371,9 @@ public void testMoveSegment() throws Exception EasyMock.expect(databaseSegmentManager.getDataSource(EasyMock.anyString())).andReturn(druidDataSource); EasyMock.replay(databaseSegmentManager); + coordinator.setDataSourcesSnapshotForTest(dataSourcesSnapshot); + EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes(); + EasyMock.replay(dataSourcesSnapshot); coordinator.moveSegment( source.toImmutableDruidServer(), dest.toImmutableDruidServer(), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 51a79307c752..7d38fa09b8f2 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -20,9 +20,11 @@ package org.apache.druid.server.coordinator; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidServer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -30,6 +32,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; import org.apache.druid.metadata.MetadataRuleManager; +import org.apache.druid.metadata.MetadataSegmentManager; import org.apache.druid.segment.IndexIO; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; @@ -67,6 +70,8 @@ public class DruidCoordinatorRuleRunnerTest private DruidCoordinatorRuleRunner ruleRunner; private ServiceEmitter emitter; private MetadataRuleManager databaseRuleManager; + private MetadataSegmentManager databaseSegmentManager; + private DataSourcesSnapshot dataSourcesSnapshot; @Before public void setUp() @@ -76,6 +81,8 @@ public void setUp() emitter = EasyMock.createMock(ServiceEmitter.class); EmittingLogger.registerEmitter(emitter); databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class); + databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class); + dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class); DateTime start = DateTimes.of("2012-01-01"); availableSegments = new ArrayList<>(); @@ -989,7 +996,9 @@ public void testDropServerActuallyServesSegment() @Test public void testReplicantThrottle() { - mockCoordinator(); + EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); + EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); + EasyMock.replay(coordinator, databaseSegmentManager, dataSourcesSnapshot); mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expectLastCall().atLeastOnce(); mockEmptyPeon(); @@ -1114,6 +1123,8 @@ public void testReplicantThrottleAcrossTiers() .build() ) .atLeastOnce(); + EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes(); + EasyMock.replay(dataSourcesSnapshot); coordinator.removeSegment(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); EasyMock.replay(coordinator); @@ -1330,8 +1341,9 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() ); availableSegments.add(v1); availableSegments.add(v2); - - mockCoordinator(); + EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of(v1.getId())).anyTimes(); + EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); + EasyMock.replay(coordinator, dataSourcesSnapshot); mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.anyObject()); EasyMock.expectLastCall().once(); mockEmptyPeon(); @@ -1375,6 +1387,7 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .withDataSourcesSnapshot(dataSourcesSnapshot) .build(); DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 793bd287b950..bb44a6c257ad 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -29,6 +29,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; +import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.DruidDataSource; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; @@ -99,6 +100,7 @@ public class DruidCoordinatorTest extends CuratorTestBase private ObjectMapper objectMapper; private DruidNode druidNode; private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter(); + private DataSourcesSnapshot dataSourcesSnapshot; @Before public void setUp() throws Exception @@ -106,8 +108,12 @@ public void setUp() throws Exception druidServer = EasyMock.createMock(DruidServer.class); serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class); databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class); + dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class); metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class); JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class); + EasyMock.expect(databaseSegmentManager.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes(); + EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes(); + EasyMock.replay(databaseSegmentManager); EasyMock.expect( configManager.watch( EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY), @@ -248,8 +254,9 @@ public void testMoveSegment() ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class); EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(segment); EasyMock.replay(druidDataSource); - EasyMock.expect(databaseSegmentManager.getDataSource(EasyMock.anyString())).andReturn(druidDataSource); - EasyMock.replay(databaseSegmentManager); + coordinator.setDataSourcesSnapshotForTest(dataSourcesSnapshot); + EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes(); + EasyMock.replay(dataSourcesSnapshot); scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); EasyMock.replay(scheduledExecutorFactory); EasyMock.replay(metadataRuleManager); @@ -533,20 +540,15 @@ public void testCoordinatorTieredRun() throws Exception private void setupMetadataSegmentManagerMock(DruidDataSource dataSource) { - EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes(); EasyMock - .expect(databaseSegmentManager.iterateAllSegments()) + .expect(dataSourcesSnapshot.iterateAllSegmentsInSnapshot()) .andReturn(dataSource.getSegments()) .anyTimes(); EasyMock - .expect(databaseSegmentManager.getDataSources()) + .expect(dataSourcesSnapshot.getDataSources()) .andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource())) .anyTimes(); - EasyMock - .expect(databaseSegmentManager.getAllDataSourceNames()) - .andReturn(Collections.singleton(dataSource.getName())) - .anyTimes(); - EasyMock.replay(databaseSegmentManager); + EasyMock.replay(dataSourcesSnapshot); } @Nullable diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index e5cfa911c452..028978835a5d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -348,12 +348,13 @@ private Iterator getAuthorizedPublishedSegments( final AuthenticationResult authenticationResult = (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( - authenticationResult, - () -> it, - SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR, - authorizerMapper - ); + final Iterable authorizedSegments = AuthorizationUtils + .filterAuthorizedResources( + authenticationResult, + () -> it, + SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR, + authorizerMapper + ); return authorizedSegments.iterator(); }