From f9ee4af3c9e4d9fe61444f06ed892a9904a6e331 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 9 Jun 2023 16:31:52 +0530 Subject: [PATCH 01/18] Add target replica column to sys table --- docs/api-reference/api-reference.md | 8 +-- ...ershadowedStatus.java => SegmentPlus.java} | 59 ++++++++++++------- ...edStatusTest.java => SegmentPlusTest.java} | 57 ++++++++++++------ .../server/coordinator/DruidCoordinator.java | 13 ++++ .../coordinator/SegmentReplicantLookup.java | 13 ++++ .../server/coordinator/rules/LoadRule.java | 7 +++ .../druid/server/http/MetadataResource.java | 30 ++++++---- .../calcite/schema/MetadataSegmentView.java | 27 +++++---- .../sql/calcite/schema/SystemSchema.java | 20 ++++--- .../sql/calcite/schema/SystemSchemaTest.java | 44 ++++++++------ 10 files changed, 185 insertions(+), 93 deletions(-) rename processing/src/main/java/org/apache/druid/timeline/{SegmentWithOvershadowedStatus.java => SegmentPlus.java} (57%) rename processing/src/test/java/org/apache/druid/timeline/{SegmentWithOvershadowedStatusTest.java => SegmentPlusTest.java} (78%) diff --git a/docs/api-reference/api-reference.md b/docs/api-reference/api-reference.md index 9b762c08183a..0e199fc03c18 100644 --- a/docs/api-reference/api-reference.md +++ b/docs/api-reference/api-reference.md @@ -171,13 +171,13 @@ Returns a list of all segments for each datasource enabled in the cluster. Returns a list of all segments for one or more specific datasources enabled in the cluster. -`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus` +`GET /druid/coordinator/v1/metadata/segments?includeFullDetails` -Returns a list of all segments for each datasource with the full segment metadata and an extra field `overshadowed`. +Returns a list of all segments for each datasource with the full segment metadata and extra fields `overshadowed` and `totalTargetReplicants`. -`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&datasources={dataSourceName1}&datasources={dataSourceName2}` +`GET /druid/coordinator/v1/metadata/segments?includeFullDetails&datasources={dataSourceName1}&datasources={dataSourceName2}` -Returns a list of all segments for one or more specific datasources with the full segment metadata and an extra field `overshadowed`. +Returns a list of all segments for one or more specific datasources with the full segment metadata and extra fields `overshadowed` and `totalTargetReplicants`. `GET /druid/coordinator/v1/metadata/datasources` diff --git a/processing/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/processing/src/main/java/org/apache/druid/timeline/SegmentPlus.java similarity index 57% rename from processing/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java rename to processing/src/main/java/org/apache/druid/timeline/SegmentPlus.java index 04c8bf4378b6..57d73d60fd2b 100644 --- a/processing/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java +++ b/processing/src/main/java/org/apache/druid/timeline/SegmentPlus.java @@ -23,39 +23,50 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonUnwrapped; +import javax.annotation.Nullable; +import java.util.Objects; + /** - * DataSegment object plus the overshadowed status for the segment. An immutable object. - * - * SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId} + * DataSegment object plus the overshadowed and target number of replicants for the segment. An immutable object. + *

+ * SegmentPlus's {@link #compareTo} method considers only the {@link SegmentId} * of the DataSegment object. */ -public class SegmentWithOvershadowedStatus implements Comparable +public class SegmentPlus implements Comparable { private final boolean overshadowed; + /** + * The target number of replicants for the segment added across all tiers. This value is null if the load rules for + * the segment have not been evaluated yet. + */ + private final Integer totalTargetReplicants; /** * dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of - * enclosing class. If in future, if {@code SegmentWithOvershadowedStatus} were to extend {@link DataSegment}, + * enclosing class. If in the future, if {@code SegmentPlus} were to extend {@link DataSegment}, * there will be no change in the serialized format. */ @JsonUnwrapped private final DataSegment dataSegment; @JsonCreator - public SegmentWithOvershadowedStatus( - @JsonProperty("overshadowed") boolean overshadowed + public SegmentPlus( + @JsonProperty("overshadowed") boolean overshadowed, + @JsonProperty("totalTargetReplicants") Integer targetReplicants ) { // Jackson will overwrite dataSegment if needed (even though the field is final) - this(null, overshadowed); + this(null, overshadowed, targetReplicants); } - public SegmentWithOvershadowedStatus( + public SegmentPlus( DataSegment dataSegment, - boolean overshadowed + boolean overshadowed, + Integer totalTargetReplicants ) { this.dataSegment = dataSegment; this.overshadowed = overshadowed; + this.totalTargetReplicants = totalTargetReplicants; } @JsonProperty @@ -70,35 +81,38 @@ public DataSegment getDataSegment() return dataSegment; } + @Nullable + @JsonProperty + public Integer getTotalTargetReplicants() + { + return totalTargetReplicants; + } + @Override public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof SegmentWithOvershadowedStatus)) { - return false; - } - final SegmentWithOvershadowedStatus that = (SegmentWithOvershadowedStatus) o; - if (!dataSegment.equals(that.dataSegment)) { - return false; - } - if (overshadowed != (that.overshadowed)) { + if (o == null || getClass() != o.getClass()) { return false; } - return true; + SegmentPlus that = (SegmentPlus) o; + return overshadowed == that.overshadowed + && Objects.equals(totalTargetReplicants, that.totalTargetReplicants) + && Objects.equals(dataSegment, that.dataSegment); } @Override public int hashCode() { int result = dataSegment.hashCode(); - result = 31 * result + Boolean.hashCode(overshadowed); + result = 31 * result + Objects.hash(overshadowed, totalTargetReplicants); return result; } @Override - public int compareTo(SegmentWithOvershadowedStatus o) + public int compareTo(SegmentPlus o) { return dataSegment.getId().compareTo(o.dataSegment.getId()); } @@ -106,8 +120,9 @@ public int compareTo(SegmentWithOvershadowedStatus o) @Override public String toString() { - return "SegmentWithOvershadowedStatus{" + + return "SegmentPlus{" + "overshadowed=" + overshadowed + + ", totalTargetReplicants=" + totalTargetReplicants + ", dataSegment=" + dataSegment + '}'; } diff --git a/processing/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java b/processing/src/test/java/org/apache/druid/timeline/SegmentPlusTest.java similarity index 78% rename from processing/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java rename to processing/src/test/java/org/apache/druid/timeline/SegmentPlusTest.java index fd239c4d2cce..47287063fa98 100644 --- a/processing/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java +++ b/processing/src/test/java/org/apache/druid/timeline/SegmentPlusTest.java @@ -40,15 +40,17 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; -public class SegmentWithOvershadowedStatusTest +public class SegmentPlusTest { private static final ObjectMapper MAPPER = createObjectMapper(); private static final Interval INTERVAL = Intervals.of("2011-10-01/2011-10-02"); private static final ImmutableMap LOAD_SPEC = ImmutableMap.of("something", "or_other"); private static final boolean OVERSHADOWED = true; + private static final Integer TOTAL_TARGET_REPLICANTS = 2; private static final int TEST_VERSION = 0x9; - private static final SegmentWithOvershadowedStatus SEGMENT = createSegmentWithOvershadowedStatus(); + private static final SegmentPlus SEGMENT = createSegmentPlusForTest(); private static ObjectMapper createObjectMapper() { @@ -59,7 +61,7 @@ private static ObjectMapper createObjectMapper() return objectMapper; } - private static SegmentWithOvershadowedStatus createSegmentWithOvershadowedStatus() + private static SegmentPlus createSegmentPlusForTest() { DataSegment dataSegment = new DataSegment( "something", @@ -74,7 +76,7 @@ private static SegmentWithOvershadowedStatus createSegmentWithOvershadowedStatus 1 ); - return new SegmentWithOvershadowedStatus(dataSegment, OVERSHADOWED); + return new SegmentPlus(dataSegment, OVERSHADOWED, TOTAL_TARGET_REPLICANTS); } @Test @@ -85,7 +87,7 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT ); - Assert.assertEquals(11, objectMap.size()); + Assert.assertEquals(12, objectMap.size()); Assert.assertEquals("something", objectMap.get("dataSource")); Assert.assertEquals(INTERVAL.toString(), objectMap.get("interval")); Assert.assertEquals("1", objectMap.get("version")); @@ -96,12 +98,13 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion")); Assert.assertEquals(1, objectMap.get("size")); Assert.assertEquals(OVERSHADOWED, objectMap.get("overshadowed")); + Assert.assertEquals(TOTAL_TARGET_REPLICANTS, objectMap.get("totalTargetReplicants")); final String json = MAPPER.writeValueAsString(SEGMENT); - final TestSegmentWithOvershadowedStatus deserializedSegment = MAPPER.readValue( + final TestSegmentPlus deserializedSegment = MAPPER.readValue( json, - TestSegmentWithOvershadowedStatus.class + TestSegmentPlus.class ); DataSegment dataSegment = SEGMENT.getDataSegment(); @@ -114,30 +117,33 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E Assert.assertEquals(dataSegment.getShardSpec(), deserializedSegment.getShardSpec()); Assert.assertEquals(dataSegment.getSize(), deserializedSegment.getSize()); Assert.assertEquals(dataSegment.getId(), deserializedSegment.getId()); + Assert.assertEquals(OVERSHADOWED, deserializedSegment.isOvershadowed()); + Assert.assertEquals(TOTAL_TARGET_REPLICANTS, deserializedSegment.getTotalTargetReplicants()); } - // Previously, the implementation of SegmentWithOvershadowedStatus had @JsonCreator/@JsonProperty and @JsonUnwrapped + // Previously, the implementation of SegmentPlus had @JsonCreator/@JsonProperty and @JsonUnwrapped // on the same field (dataSegment), which used to work in Jackson 2.6, but does not work with Jackson 2.9: // https://github.com/FasterXML/jackson-databind/issues/265#issuecomment-264344051 @Test public void testJsonCreatorAndJsonUnwrappedAnnotationsAreCompatible() throws Exception { String json = MAPPER.writeValueAsString(SEGMENT); - SegmentWithOvershadowedStatus segment = MAPPER.readValue(json, SegmentWithOvershadowedStatus.class); + SegmentPlus segment = MAPPER.readValue(json, SegmentPlus.class); Assert.assertEquals(SEGMENT, segment); Assert.assertEquals(json, MAPPER.writeValueAsString(segment)); } } /** - * Subclass of DataSegment with overshadowed status + * Flat subclass of DataSegment for testing */ -class TestSegmentWithOvershadowedStatus extends DataSegment +class TestSegmentPlus extends DataSegment { private final boolean overshadowed; + private final Integer totalTargetReplicants; @JsonCreator - public TestSegmentWithOvershadowedStatus( + public TestSegmentPlus( @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("version") String version, @@ -154,7 +160,8 @@ public TestSegmentWithOvershadowedStatus( @JsonProperty("lasCompactionState") @Nullable CompactionState lastCompactionState, @JsonProperty("binaryVersion") Integer binaryVersion, @JsonProperty("size") long size, - @JsonProperty("overshadowed") boolean overshadowed + @JsonProperty("overshadowed") boolean overshadowed, + @JsonProperty("totalTargetReplicants") Integer totalTargetReplicants ) { super( @@ -170,6 +177,7 @@ public TestSegmentWithOvershadowedStatus( size ); this.overshadowed = overshadowed; + this.totalTargetReplicants = totalTargetReplicants; } @JsonProperty @@ -178,23 +186,34 @@ public boolean isOvershadowed() return overshadowed; } + @JsonProperty + public Integer getTotalTargetReplicants() + { + return totalTargetReplicants; + } + @Override public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof TestSegmentWithOvershadowedStatus)) { + if (o == null || getClass() != o.getClass()) { return false; } if (!super.equals(o)) { return false; } - final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o; - if (overshadowed != (that.overshadowed)) { - return false; - } - return true; + TestSegmentPlus that = (TestSegmentPlus) o; + return overshadowed == that.overshadowed && Objects.equals( + totalTargetReplicants, + that.totalTargetReplicants + ); } + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), overshadowed, totalTargetReplicants); + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 8fce69e340b2..a2154b72f004 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; @@ -154,6 +155,7 @@ public class DruidCoordinator private volatile boolean started = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; + private volatile ImmutableMap prevTotalTargetReplicantMap = null; private volatile DruidCluster cluster = null; private int cachedBalancerThreadNumber; @@ -817,6 +819,12 @@ private List makeCompactSegmentsDuty() return ImmutableList.of(compactSegments); } + @Nullable + public Integer getTotalTargetReplicantsForSegment(SegmentId segmentId) + { + return prevTotalTargetReplicantMap == null ? null : prevTotalTargetReplicantMap.get(segmentId); + } + @VisibleForTesting protected class DutiesRunnable implements Runnable { @@ -943,6 +951,11 @@ public void run() } } } + + if (params.getSegmentReplicantLookup() != null) { + prevTotalTargetReplicantMap = params.getSegmentReplicantLookup().createTargetReplicantMapCopy(); + } + // Emit the runtime of the full DutiesRunnable params.getEmitter().emit( new ServiceMetricEvent.Builder() diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java index 3a4bdb9f6274..0ddea2ab15b5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordinator; import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Table; import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; @@ -30,6 +31,7 @@ import java.util.HashMap; import java.util.Map; import java.util.SortedSet; +import java.util.concurrent.ConcurrentHashMap; /** * A lookup for the number of replicants of a given segment for a certain tier. @@ -79,6 +81,7 @@ public static SegmentReplicantLookup make(DruidCluster cluster, boolean replicat private final Table segmentsInCluster; private final Table loadingSegments; + private final ConcurrentHashMap totalTargetReplicants = new ConcurrentHashMap<>(); private final DruidCluster cluster; private SegmentReplicantLookup( @@ -114,6 +117,16 @@ public int getLoadedReplicants(SegmentId segmentId, String tier) return (retVal == null) ? 0 : retVal; } + public void setTotalTargetReplicants(SegmentId segmentId, Integer requiredReplicas) + { + totalTargetReplicants.put(segmentId, requiredReplicas); + } + + public ImmutableMap createTargetReplicantMapCopy() + { + return ImmutableMap.copyOf(totalTargetReplicants); + } + private int getLoadingReplicants(SegmentId segmentId, String tier) { Integer retVal = loadingSegments.get(segmentId, tier); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java index 933b63e47263..7777afb54b18 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java @@ -77,6 +77,8 @@ public CoordinatorStats run( targetReplicants.putAll(getTieredReplicants()); currentReplicants.putAll(params.getSegmentReplicantLookup().getClusterTiers(segment.getId())); + params.getSegmentReplicantLookup().setTotalTargetReplicants(segment.getId(), getTotalTargetReplicants()); + final CoordinatorStats stats = new CoordinatorStats(); assign(params, segment, stats); @@ -93,6 +95,11 @@ public CoordinatorStats run( } } + private int getTotalTargetReplicants() + { + return getTieredReplicants().values().stream().reduce(0, Integer::sum); + } + @Override public boolean canLoadSegments() { diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index ade8ab992e47..6d675aba7ee8 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -32,13 +32,14 @@ import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.server.JettyUtils; +import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.http.security.DatasourceResourceFilter; import org.apache.druid.server.security.AuthorizationUtils; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentWithOvershadowedStatus; +import org.apache.druid.timeline.SegmentPlus; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -69,18 +70,21 @@ public class MetadataResource private final SegmentsMetadataManager segmentsMetadataManager; private final IndexerMetadataStorageCoordinator metadataStorageCoordinator; private final AuthorizerMapper authorizerMapper; + private final DruidCoordinator coordinator; @Inject public MetadataResource( SegmentsMetadataManager segmentsMetadataManager, IndexerMetadataStorageCoordinator metadataStorageCoordinator, AuthorizerMapper authorizerMapper, + DruidCoordinator coordinator, @Json ObjectMapper jsonMapper ) { this.segmentsMetadataManager = segmentsMetadataManager; this.metadataStorageCoordinator = metadataStorageCoordinator; this.authorizerMapper = authorizerMapper; + this.coordinator = coordinator; } @GET @@ -136,11 +140,11 @@ public Response getDataSources( public Response getAllUsedSegments( @Context final HttpServletRequest req, @QueryParam("datasources") final @Nullable Set dataSources, - @QueryParam("includeOvershadowedStatus") final @Nullable String includeOvershadowedStatus + @QueryParam("includeFullDetails") final @Nullable String includeFullDetails ) { - if (includeOvershadowedStatus != null) { - return getAllUsedSegmentsWithOvershadowedStatus(req, dataSources); + if (includeFullDetails != null) { + return getAllUsedSegmentsWithAdditionalDetails(req, dataSources); } Collection dataSourcesWithUsedSegments = @@ -165,7 +169,7 @@ public Response getAllUsedSegments( return builder.entity(authorizedSegments).build(); } - private Response getAllUsedSegmentsWithOvershadowedStatus( + private Response getAllUsedSegmentsWithAdditionalDetails( HttpServletRequest req, @Nullable Set dataSources ) @@ -184,15 +188,21 @@ private Response getAllUsedSegmentsWithOvershadowedStatus( .flatMap(t -> t.getSegments().stream()); final Set overshadowedSegments = dataSourcesSnapshot.getOvershadowedSegments(); - final Stream usedSegmentsWithOvershadowedStatus = usedSegments - .map(segment -> new SegmentWithOvershadowedStatus(segment, overshadowedSegments.contains(segment))); + final Stream segmentPlusStream = usedSegments + .map(segment -> + new SegmentPlus( + segment, + overshadowedSegments.contains(segment), + coordinator.getTotalTargetReplicantsForSegment(segment.getId()) + ) + ); - final Function> raGenerator = segment -> Collections + final Function> raGenerator = segment -> Collections .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( req, - usedSegmentsWithOvershadowedStatus::iterator, + segmentPlusStream::iterator, raGenerator, authorizerMapper ); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 5d97ea394d25..3cb94c999053 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -40,7 +40,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.sql.calcite.planner.SegmentMetadataCacheConfig; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentWithOvershadowedStatus; +import org.apache.druid.timeline.SegmentPlus; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import java.util.Iterator; @@ -76,7 +76,7 @@ public class MetadataSegmentView * from other threads. */ @MonotonicNonNull - private volatile ImmutableSortedSet publishedSegments = null; + private volatile ImmutableSortedSet publishedSegments = null; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -133,27 +133,28 @@ public void stop() private void poll() { log.info("polling published segments from coordinator"); - final JsonParserIterator metadataSegments = getMetadataSegments( + final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, segmentWatcherConfig.getWatchedDataSources() ); - final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder(); + final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder(); while (metadataSegments.hasNext()) { - final SegmentWithOvershadowedStatus segment = metadataSegments.next(); + final SegmentPlus segment = metadataSegments.next(); final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment()); - final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new SegmentWithOvershadowedStatus( + final SegmentPlus segmentPlus = new SegmentPlus( interned, - segment.isOvershadowed() + segment.isOvershadowed(), + segment.getTotalTargetReplicants() ); - builder.add(segmentWithOvershadowedStatus); + builder.add(segmentPlus); } publishedSegments = builder.build(); cachePopulated.countDown(); } - Iterator getPublishedSegments() + Iterator getPublishedSegments() { if (isCacheEnabled) { Uninterruptibles.awaitUninterruptibly(cachePopulated); @@ -168,13 +169,13 @@ Iterator getPublishedSegments() } // Note that coordinator must be up to get segments - private JsonParserIterator getMetadataSegments( + private JsonParserIterator getMetadataSegments( DruidLeaderClient coordinatorClient, ObjectMapper jsonMapper, Set watchedDataSources ) { - String query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus"; + String query = "/druid/coordinator/v1/metadata/segments?includeFullDetails"; if (watchedDataSources != null && !watchedDataSources.isEmpty()) { log.debug( "filtering datasources in published segments based on broker's watchedDataSources[%s]", watchedDataSources); @@ -183,12 +184,12 @@ private JsonParserIterator getMetadataSegments( sb.append("datasources=").append(ds).append("&"); } sb.setLength(sb.length() - 1); - query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&" + sb; + query = "/druid/coordinator/v1/metadata/segments?includeFullDetails&" + sb; } return SystemSchema.getThingsFromLeaderNode( query, - new TypeReference() + new TypeReference() { }, coordinatorClient, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 3e6cb3e58f20..c4818d351017 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -81,7 +81,7 @@ import org.apache.druid.sql.calcite.table.RowSignatures; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentWithOvershadowedStatus; +import org.apache.druid.timeline.SegmentPlus; import org.jboss.netty.handler.codec.http.HttpMethod; import javax.annotation.Nullable; @@ -106,7 +106,7 @@ public class SystemSchema extends AbstractSchema private static final String TASKS_TABLE = "tasks"; private static final String SUPERVISOR_TABLE = "supervisors"; - private static final Function> + private static final Function> SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR = segment -> Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply( segment.getDataSegment().getDataSource()) @@ -129,6 +129,7 @@ public class SystemSchema extends AbstractSchema private static final long IS_AVAILABLE_TRUE = 1L; private static final long IS_OVERSHADOWED_FALSE = 0L; private static final long IS_OVERSHADOWED_TRUE = 1L; + private static final long TARGET_NUM_REPLICAS_UNKNOWN = -1L; static final RowSignature SEGMENTS_SIGNATURE = RowSignature .builder() @@ -150,6 +151,7 @@ public class SystemSchema extends AbstractSchema .add("dimensions", ColumnType.STRING) .add("metrics", ColumnType.STRING) .add("last_compaction_state", ColumnType.STRING) + .add("target_num_replicas", ColumnType.LONG) .build(); static final RowSignature SERVERS_SIGNATURE = RowSignature @@ -288,7 +290,7 @@ public Enumerable scan(DataContext root) // Get published segments from metadata segment cache (if enabled in SQL planner config), else directly from // Coordinator. - final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); + final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = Sets.newHashSetWithExpectedSize(druidSchema.cache().getTotalSegments()); @@ -326,7 +328,8 @@ public Enumerable scan(DataContext root) segment.getShardSpec() == null ? null : jsonMapper.writeValueAsString(segment.getShardSpec()), segment.getDimensions() == null ? null : jsonMapper.writeValueAsString(segment.getDimensions()), segment.getMetrics() == null ? null : jsonMapper.writeValueAsString(segment.getMetrics()), - segment.getLastCompactionState() == null ? null : jsonMapper.writeValueAsString(segment.getLastCompactionState()) + segment.getLastCompactionState() == null ? null : jsonMapper.writeValueAsString(segment.getLastCompactionState()), + val.getTotalTargetReplicants() == null ? TARGET_NUM_REPLICAS_UNKNOWN : (long) val.getTotalTargetReplicants() }; } catch (JsonProcessingException e) { @@ -368,7 +371,8 @@ public Enumerable scan(DataContext root) val.getValue().getSegment().getShardSpec() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getShardSpec()), val.getValue().getSegment().getDimensions() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()), val.getValue().getSegment().getMetrics() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()), - null // unpublished segments from realtime tasks will not be compacted yet + null, // unpublished segments from realtime tasks will not be compacted yet + TARGET_NUM_REPLICAS_UNKNOWN }; } catch (JsonProcessingException e) { @@ -384,8 +388,8 @@ public Enumerable scan(DataContext root) } - private Iterator getAuthorizedPublishedSegments( - Iterator it, + private Iterator getAuthorizedPublishedSegments( + Iterator it, DataContext root ) { @@ -394,7 +398,7 @@ private Iterator getAuthorizedPublishedSegments( "authenticationResult in dataContext" ); - final Iterable authorizedSegments = AuthorizationUtils + final Iterable authorizedSegments = AuthorizationUtils .filterAuthorizedResources( authenticationResult, () -> it, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 50d0f09a85e5..9a21024ca0ff 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -99,7 +99,7 @@ import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentWithOvershadowedStatus; +import org.apache.druid.timeline.SegmentPlus; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; @@ -542,7 +542,7 @@ public void testGetTableMap() final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl()); final List fields = rowType.getFieldList(); - Assert.assertEquals(18, fields.size()); + Assert.assertEquals(19, fields.size()); final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) schema.getTableMap().get("tasks"); final RelDataType sysRowType = tasksTable.getRowType(new JavaTypeFactoryImpl()); @@ -564,12 +564,12 @@ public void testGetTableMap() public void testSegmentsTable() throws Exception { final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, new ObjectMapper(), authMapper); - final Set publishedSegments = new HashSet<>(Arrays.asList( - new SegmentWithOvershadowedStatus(publishedCompactedSegment1, true), - new SegmentWithOvershadowedStatus(publishedCompactedSegment2, false), - new SegmentWithOvershadowedStatus(publishedUncompactedSegment3, false), - new SegmentWithOvershadowedStatus(segment1, true), - new SegmentWithOvershadowedStatus(segment2, false) + final Set publishedSegments = new HashSet<>(Arrays.asList(// HERE + new SegmentPlus(publishedCompactedSegment1, true, 2), + new SegmentPlus(publishedCompactedSegment2, false, 0), + new SegmentPlus(publishedUncompactedSegment3, false, 2), + new SegmentPlus(segment1, true, 2), + new SegmentPlus(segment2, false, 0) )); EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once(); @@ -598,7 +598,8 @@ public void testSegmentsTable() throws Exception 1L, //is_available 0L, //is_realtime 1L, //is_overshadowed - null //is_compacted + null, //is_compacted + 2L // targetReplicas ); verifyRow( @@ -612,7 +613,8 @@ public void testSegmentsTable() throws Exception 1L, //is_available 0L, //is_realtime 0L, //is_overshadowed, - null //is_compacted + null, //is_compacted + 0L // target_replicas ); //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2 @@ -627,7 +629,8 @@ public void testSegmentsTable() throws Exception 1L, //is_available 0L, //is_realtime 0L, //is_overshadowed - null //is_compacted + null, //is_compacted + -1L // target_replicas ); verifyRow( @@ -641,7 +644,8 @@ public void testSegmentsTable() throws Exception 1L, //is_available 1L, //is_realtime 0L, //is_overshadowed - null //is_compacted + null, //is_compacted + -1L // target_replicas ); verifyRow( @@ -655,7 +659,8 @@ public void testSegmentsTable() throws Exception 1L, //is_available 1L, //is_realtime 0L, //is_overshadowed - null //is_compacted + null, //is_compacted + -1L // target_replicas ); // wikipedia segments are published and unavailable, num_replicas is 0 @@ -671,7 +676,8 @@ public void testSegmentsTable() throws Exception 0L, //is_available 0L, //is_realtime 1L, //is_overshadowed - expectedCompactionState //is_compacted + expectedCompactionState, //is_compacted + 2L // target_replicas ); verifyRow( @@ -685,7 +691,8 @@ public void testSegmentsTable() throws Exception 0L, //is_available 0L, //is_realtime 0L, //is_overshadowed - expectedCompactionState //is_compacted + expectedCompactionState, //is_compacted + 0L // target_replicas ); verifyRow( @@ -699,7 +706,8 @@ public void testSegmentsTable() throws Exception 0L, //is_available 0L, //is_realtime 0L, //is_overshadowed - null //is_compacted + null, //is_compacted + 2L // target_replicas ); // Verify value types. @@ -717,7 +725,8 @@ private void verifyRow( long isAvailable, long isRealtime, long isOvershadowed, - CompactionState compactionState + CompactionState compactionState, + long targetReplicas ) throws Exception { Assert.assertEquals(segmentId, row[0].toString()); @@ -740,6 +749,7 @@ private void verifyRow( } else { Assert.assertEquals(mapper.writeValueAsString(compactionState), row[17]); } + Assert.assertEquals(targetReplicas, row[18]); } @Test From a397e18eac8fe66133fbb79d4aaba9c12c9a8974 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 9 Jun 2023 19:39:14 +0530 Subject: [PATCH 02/18] Remove old comment --- .../org/apache/druid/sql/calcite/schema/SystemSchemaTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 9a21024ca0ff..9ab99b728209 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -564,7 +564,7 @@ public void testGetTableMap() public void testSegmentsTable() throws Exception { final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, new ObjectMapper(), authMapper); - final Set publishedSegments = new HashSet<>(Arrays.asList(// HERE + final Set publishedSegments = new HashSet<>(Arrays.asList( new SegmentPlus(publishedCompactedSegment1, true, 2), new SegmentPlus(publishedCompactedSegment2, false, 0), new SegmentPlus(publishedUncompactedSegment3, false, 2), From b431b2936acc98ca30c2c946ba86b22a4449f325 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Sat, 10 Jun 2023 17:55:07 +0530 Subject: [PATCH 03/18] Address review comments --- docs/api-reference/api-reference.md | 8 ++-- ...tPlus.java => SegmentStatusInCluster.java} | 40 +++++++++--------- ...t.java => SegmentStatusInClusterTest.java} | 42 +++++++++---------- .../server/coordinator/DruidCoordinator.java | 17 +++++--- .../coordinator/SegmentReplicantLookup.java | 10 ++--- .../server/coordinator/rules/LoadRule.java | 4 +- .../druid/server/http/MetadataResource.java | 18 ++++---- .../calcite/schema/MetadataSegmentView.java | 26 ++++++------ .../sql/calcite/schema/SystemSchema.java | 21 +++++----- .../sql/calcite/schema/SystemSchemaTest.java | 14 +++---- 10 files changed, 103 insertions(+), 97 deletions(-) rename processing/src/main/java/org/apache/druid/timeline/{SegmentPlus.java => SegmentStatusInCluster.java} (70%) rename processing/src/test/java/org/apache/druid/timeline/{SegmentPlusTest.java => SegmentStatusInClusterTest.java} (86%) diff --git a/docs/api-reference/api-reference.md b/docs/api-reference/api-reference.md index 0e199fc03c18..2a6c36deae13 100644 --- a/docs/api-reference/api-reference.md +++ b/docs/api-reference/api-reference.md @@ -171,13 +171,13 @@ Returns a list of all segments for each datasource enabled in the cluster. Returns a list of all segments for one or more specific datasources enabled in the cluster. -`GET /druid/coordinator/v1/metadata/segments?includeFullDetails` +`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus` -Returns a list of all segments for each datasource with the full segment metadata and extra fields `overshadowed` and `totalTargetReplicants`. +Returns a list of all segments for each datasource with the full segment metadata and extra fields `overshadowed` and `replicationFactor`. -`GET /druid/coordinator/v1/metadata/segments?includeFullDetails&datasources={dataSourceName1}&datasources={dataSourceName2}` +`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&datasources={dataSourceName1}&datasources={dataSourceName2}` -Returns a list of all segments for one or more specific datasources with the full segment metadata and extra fields `overshadowed` and `totalTargetReplicants`. +Returns a list of all segments for one or more specific datasources with the full segment metadata and extra fields `overshadowed` and `replicationFactor`. `GET /druid/coordinator/v1/metadata/datasources` diff --git a/processing/src/main/java/org/apache/druid/timeline/SegmentPlus.java b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java similarity index 70% rename from processing/src/main/java/org/apache/druid/timeline/SegmentPlus.java rename to processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java index 57d73d60fd2b..c243e09a23b0 100644 --- a/processing/src/main/java/org/apache/druid/timeline/SegmentPlus.java +++ b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java @@ -29,44 +29,44 @@ /** * DataSegment object plus the overshadowed and target number of replicants for the segment. An immutable object. *

- * SegmentPlus's {@link #compareTo} method considers only the {@link SegmentId} + * SegmentStatusInCluster's {@link #compareTo} method considers only the {@link SegmentId} * of the DataSegment object. */ -public class SegmentPlus implements Comparable +public class SegmentStatusInCluster implements Comparable { private final boolean overshadowed; /** - * The target number of replicants for the segment added across all tiers. This value is null if the load rules for + * The target repication factor for the segment added across all tiers. This value is null if the load rules for * the segment have not been evaluated yet. */ - private final Integer totalTargetReplicants; + private final Integer replicationFactor; /** * dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of - * enclosing class. If in the future, if {@code SegmentPlus} were to extend {@link DataSegment}, + * enclosing class. If in the future, if {@code SegmentStatusInCluster} were to extend {@link DataSegment}, * there will be no change in the serialized format. */ @JsonUnwrapped private final DataSegment dataSegment; @JsonCreator - public SegmentPlus( + public SegmentStatusInCluster( @JsonProperty("overshadowed") boolean overshadowed, - @JsonProperty("totalTargetReplicants") Integer targetReplicants + @JsonProperty("replicationFactor") @Nullable Integer replicationFactor ) { // Jackson will overwrite dataSegment if needed (even though the field is final) - this(null, overshadowed, targetReplicants); + this(null, overshadowed, replicationFactor); } - public SegmentPlus( + public SegmentStatusInCluster( DataSegment dataSegment, boolean overshadowed, - Integer totalTargetReplicants + Integer replicationFactor ) { this.dataSegment = dataSegment; this.overshadowed = overshadowed; - this.totalTargetReplicants = totalTargetReplicants; + this.replicationFactor = replicationFactor; } @JsonProperty @@ -83,9 +83,9 @@ public DataSegment getDataSegment() @Nullable @JsonProperty - public Integer getTotalTargetReplicants() + public Integer getReplicationFactor() { - return totalTargetReplicants; + return replicationFactor; } @Override @@ -97,22 +97,20 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - SegmentPlus that = (SegmentPlus) o; + SegmentStatusInCluster that = (SegmentStatusInCluster) o; return overshadowed == that.overshadowed - && Objects.equals(totalTargetReplicants, that.totalTargetReplicants) + && Objects.equals(replicationFactor, that.replicationFactor) && Objects.equals(dataSegment, that.dataSegment); } @Override public int hashCode() { - int result = dataSegment.hashCode(); - result = 31 * result + Objects.hash(overshadowed, totalTargetReplicants); - return result; + return Objects.hash(overshadowed, replicationFactor, dataSegment); } @Override - public int compareTo(SegmentPlus o) + public int compareTo(SegmentStatusInCluster o) { return dataSegment.getId().compareTo(o.dataSegment.getId()); } @@ -120,9 +118,9 @@ public int compareTo(SegmentPlus o) @Override public String toString() { - return "SegmentPlus{" + + return "SegmentStatusInCluster{" + "overshadowed=" + overshadowed + - ", totalTargetReplicants=" + totalTargetReplicants + + ", replicationFactor=" + replicationFactor + ", dataSegment=" + dataSegment + '}'; } diff --git a/processing/src/test/java/org/apache/druid/timeline/SegmentPlusTest.java b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java similarity index 86% rename from processing/src/test/java/org/apache/druid/timeline/SegmentPlusTest.java rename to processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java index 47287063fa98..acdec9255006 100644 --- a/processing/src/test/java/org/apache/druid/timeline/SegmentPlusTest.java +++ b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java @@ -42,7 +42,7 @@ import java.util.Map; import java.util.Objects; -public class SegmentPlusTest +public class SegmentStatusInClusterTest { private static final ObjectMapper MAPPER = createObjectMapper(); private static final Interval INTERVAL = Intervals.of("2011-10-01/2011-10-02"); @@ -50,7 +50,7 @@ public class SegmentPlusTest private static final boolean OVERSHADOWED = true; private static final Integer TOTAL_TARGET_REPLICANTS = 2; private static final int TEST_VERSION = 0x9; - private static final SegmentPlus SEGMENT = createSegmentPlusForTest(); + private static final SegmentStatusInCluster SEGMENT = createSegmentForTest(); private static ObjectMapper createObjectMapper() { @@ -61,7 +61,7 @@ private static ObjectMapper createObjectMapper() return objectMapper; } - private static SegmentPlus createSegmentPlusForTest() + private static SegmentStatusInCluster createSegmentForTest() { DataSegment dataSegment = new DataSegment( "something", @@ -76,7 +76,7 @@ private static SegmentPlus createSegmentPlusForTest() 1 ); - return new SegmentPlus(dataSegment, OVERSHADOWED, TOTAL_TARGET_REPLICANTS); + return new SegmentStatusInCluster(dataSegment, OVERSHADOWED, TOTAL_TARGET_REPLICANTS); } @Test @@ -98,13 +98,13 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion")); Assert.assertEquals(1, objectMap.get("size")); Assert.assertEquals(OVERSHADOWED, objectMap.get("overshadowed")); - Assert.assertEquals(TOTAL_TARGET_REPLICANTS, objectMap.get("totalTargetReplicants")); + Assert.assertEquals(TOTAL_TARGET_REPLICANTS, objectMap.get("replicationFactor")); final String json = MAPPER.writeValueAsString(SEGMENT); - final TestSegmentPlus deserializedSegment = MAPPER.readValue( + final TestSegment deserializedSegment = MAPPER.readValue( json, - TestSegmentPlus.class + TestSegment.class ); DataSegment dataSegment = SEGMENT.getDataSegment(); @@ -118,17 +118,17 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E Assert.assertEquals(dataSegment.getSize(), deserializedSegment.getSize()); Assert.assertEquals(dataSegment.getId(), deserializedSegment.getId()); Assert.assertEquals(OVERSHADOWED, deserializedSegment.isOvershadowed()); - Assert.assertEquals(TOTAL_TARGET_REPLICANTS, deserializedSegment.getTotalTargetReplicants()); + Assert.assertEquals(TOTAL_TARGET_REPLICANTS, deserializedSegment.getReplicationFactor()); } - // Previously, the implementation of SegmentPlus had @JsonCreator/@JsonProperty and @JsonUnwrapped + // Previously, the implementation of SegmentStatusInCluster had @JsonCreator/@JsonProperty and @JsonUnwrapped // on the same field (dataSegment), which used to work in Jackson 2.6, but does not work with Jackson 2.9: // https://github.com/FasterXML/jackson-databind/issues/265#issuecomment-264344051 @Test public void testJsonCreatorAndJsonUnwrappedAnnotationsAreCompatible() throws Exception { String json = MAPPER.writeValueAsString(SEGMENT); - SegmentPlus segment = MAPPER.readValue(json, SegmentPlus.class); + SegmentStatusInCluster segment = MAPPER.readValue(json, SegmentStatusInCluster.class); Assert.assertEquals(SEGMENT, segment); Assert.assertEquals(json, MAPPER.writeValueAsString(segment)); } @@ -137,13 +137,13 @@ public void testJsonCreatorAndJsonUnwrappedAnnotationsAreCompatible() throws Exc /** * Flat subclass of DataSegment for testing */ -class TestSegmentPlus extends DataSegment +class TestSegment extends DataSegment { private final boolean overshadowed; - private final Integer totalTargetReplicants; + private final Integer replicationFactor; @JsonCreator - public TestSegmentPlus( + public TestSegment( @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("version") String version, @@ -161,7 +161,7 @@ public TestSegmentPlus( @JsonProperty("binaryVersion") Integer binaryVersion, @JsonProperty("size") long size, @JsonProperty("overshadowed") boolean overshadowed, - @JsonProperty("totalTargetReplicants") Integer totalTargetReplicants + @JsonProperty("replicationFactor") Integer replicationFactor ) { super( @@ -177,7 +177,7 @@ public TestSegmentPlus( size ); this.overshadowed = overshadowed; - this.totalTargetReplicants = totalTargetReplicants; + this.replicationFactor = replicationFactor; } @JsonProperty @@ -187,9 +187,9 @@ public boolean isOvershadowed() } @JsonProperty - public Integer getTotalTargetReplicants() + public Integer getReplicationFactor() { - return totalTargetReplicants; + return replicationFactor; } @Override @@ -204,16 +204,16 @@ public boolean equals(Object o) if (!super.equals(o)) { return false; } - TestSegmentPlus that = (TestSegmentPlus) o; + TestSegment that = (TestSegment) o; return overshadowed == that.overshadowed && Objects.equals( - totalTargetReplicants, - that.totalTargetReplicants + replicationFactor, + that.replicationFactor ); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), overshadowed, totalTargetReplicants); + return Objects.hash(super.hashCode(), overshadowed, replicationFactor); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index a2154b72f004..bbed09147585 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; @@ -155,7 +154,13 @@ public class DruidCoordinator private volatile boolean started = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; - private volatile ImmutableMap prevTotalTargetReplicantMap = null; + + /** + * Contains a map of segmentId to total replciation factor across all tiers. This map is refreshed when load rules are + * evaluated. It is used by {@link DruidCoordinator} to supply this value to + * {@link org.apache.druid.server.http.MetadataResource}. + */ + private volatile Map segmentIdToReplicationFactor = null; private volatile DruidCluster cluster = null; private int cachedBalancerThreadNumber; @@ -820,9 +825,9 @@ private List makeCompactSegmentsDuty() } @Nullable - public Integer getTotalTargetReplicantsForSegment(SegmentId segmentId) + public Integer getReplicationFactorForSegment(SegmentId segmentId) { - return prevTotalTargetReplicantMap == null ? null : prevTotalTargetReplicantMap.get(segmentId); + return segmentIdToReplicationFactor == null ? null : segmentIdToReplicationFactor.get(segmentId); } @VisibleForTesting @@ -952,8 +957,10 @@ public void run() } } + // Update the immutable replication factor map with latest values. + // This value is set here as it is recalculated during load rule evaluation. if (params.getSegmentReplicantLookup() != null) { - prevTotalTargetReplicantMap = params.getSegmentReplicantLookup().createTargetReplicantMapCopy(); + segmentIdToReplicationFactor = params.getSegmentReplicantLookup().getSegmentIdToReplicationFactor(); } // Emit the runtime of the full DutiesRunnable diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java index 0ddea2ab15b5..988380ae0852 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java @@ -81,7 +81,7 @@ public static SegmentReplicantLookup make(DruidCluster cluster, boolean replicat private final Table segmentsInCluster; private final Table loadingSegments; - private final ConcurrentHashMap totalTargetReplicants = new ConcurrentHashMap<>(); + private final ConcurrentHashMap replicationFactorMap = new ConcurrentHashMap<>(); private final DruidCluster cluster; private SegmentReplicantLookup( @@ -117,14 +117,14 @@ public int getLoadedReplicants(SegmentId segmentId, String tier) return (retVal == null) ? 0 : retVal; } - public void setTotalTargetReplicants(SegmentId segmentId, Integer requiredReplicas) + public void setReplicationFactor(SegmentId segmentId, Integer requiredReplicas) { - totalTargetReplicants.put(segmentId, requiredReplicas); + replicationFactorMap.put(segmentId, requiredReplicas); } - public ImmutableMap createTargetReplicantMapCopy() + public Map getSegmentIdToReplicationFactor() { - return ImmutableMap.copyOf(totalTargetReplicants); + return ImmutableMap.copyOf(replicationFactorMap); } private int getLoadingReplicants(SegmentId segmentId, String tier) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java index 7777afb54b18..c89a74edbff8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java @@ -77,7 +77,7 @@ public CoordinatorStats run( targetReplicants.putAll(getTieredReplicants()); currentReplicants.putAll(params.getSegmentReplicantLookup().getClusterTiers(segment.getId())); - params.getSegmentReplicantLookup().setTotalTargetReplicants(segment.getId(), getTotalTargetReplicants()); + params.getSegmentReplicantLookup().setReplicationFactor(segment.getId(), getReplicationFactor()); final CoordinatorStats stats = new CoordinatorStats(); assign(params, segment, stats); @@ -95,7 +95,7 @@ public CoordinatorStats run( } } - private int getTotalTargetReplicants() + private int getReplicationFactor() { return getTieredReplicants().values().stream().reduce(0, Integer::sum); } diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 6d675aba7ee8..9f2ca8b37f62 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -39,7 +39,7 @@ import org.apache.druid.server.security.ResourceAction; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentPlus; +import org.apache.druid.timeline.SegmentStatusInCluster; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -140,10 +140,10 @@ public Response getDataSources( public Response getAllUsedSegments( @Context final HttpServletRequest req, @QueryParam("datasources") final @Nullable Set dataSources, - @QueryParam("includeFullDetails") final @Nullable String includeFullDetails + @QueryParam("includeOvershadowedStatus") final @Nullable String includeOvershadowedStatus ) { - if (includeFullDetails != null) { + if (includeOvershadowedStatus != null) { return getAllUsedSegmentsWithAdditionalDetails(req, dataSources); } @@ -188,21 +188,21 @@ private Response getAllUsedSegmentsWithAdditionalDetails( .flatMap(t -> t.getSegments().stream()); final Set overshadowedSegments = dataSourcesSnapshot.getOvershadowedSegments(); - final Stream segmentPlusStream = usedSegments + final Stream segmentStatus = usedSegments .map(segment -> - new SegmentPlus( + new SegmentStatusInCluster( segment, overshadowedSegments.contains(segment), - coordinator.getTotalTargetReplicantsForSegment(segment.getId()) + coordinator.getReplicationFactorForSegment(segment.getId()) ) ); - final Function> raGenerator = segment -> Collections + final Function> raGenerator = segment -> Collections .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( req, - segmentPlusStream::iterator, + segmentStatus::iterator, raGenerator, authorizerMapper ); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 3cb94c999053..7b8a0b3e9dc7 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -40,7 +40,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.sql.calcite.planner.SegmentMetadataCacheConfig; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentPlus; +import org.apache.druid.timeline.SegmentStatusInCluster; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import java.util.Iterator; @@ -76,7 +76,7 @@ public class MetadataSegmentView * from other threads. */ @MonotonicNonNull - private volatile ImmutableSortedSet publishedSegments = null; + private volatile ImmutableSortedSet publishedSegments = null; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -133,28 +133,28 @@ public void stop() private void poll() { log.info("polling published segments from coordinator"); - final JsonParserIterator metadataSegments = getMetadataSegments( + final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, segmentWatcherConfig.getWatchedDataSources() ); - final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder(); + final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder(); while (metadataSegments.hasNext()) { - final SegmentPlus segment = metadataSegments.next(); + final SegmentStatusInCluster segment = metadataSegments.next(); final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment()); - final SegmentPlus segmentPlus = new SegmentPlus( + final SegmentStatusInCluster segmentStatusInCluster = new SegmentStatusInCluster( interned, segment.isOvershadowed(), - segment.getTotalTargetReplicants() + segment.getReplicationFactor() ); - builder.add(segmentPlus); + builder.add(segmentStatusInCluster); } publishedSegments = builder.build(); cachePopulated.countDown(); } - Iterator getPublishedSegments() + Iterator getPublishedSegments() { if (isCacheEnabled) { Uninterruptibles.awaitUninterruptibly(cachePopulated); @@ -169,13 +169,13 @@ Iterator getPublishedSegments() } // Note that coordinator must be up to get segments - private JsonParserIterator getMetadataSegments( + private JsonParserIterator getMetadataSegments( DruidLeaderClient coordinatorClient, ObjectMapper jsonMapper, Set watchedDataSources ) { - String query = "/druid/coordinator/v1/metadata/segments?includeFullDetails"; + String query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus"; if (watchedDataSources != null && !watchedDataSources.isEmpty()) { log.debug( "filtering datasources in published segments based on broker's watchedDataSources[%s]", watchedDataSources); @@ -184,12 +184,12 @@ private JsonParserIterator getMetadataSegments( sb.append("datasources=").append(ds).append("&"); } sb.setLength(sb.length() - 1); - query = "/druid/coordinator/v1/metadata/segments?includeFullDetails&" + sb; + query = "/druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&" + sb; } return SystemSchema.getThingsFromLeaderNode( query, - new TypeReference() + new TypeReference() { }, coordinatorClient, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index c4818d351017..78ad23ebaddb 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -81,7 +81,7 @@ import org.apache.druid.sql.calcite.table.RowSignatures; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentPlus; +import org.apache.druid.timeline.SegmentStatusInCluster; import org.jboss.netty.handler.codec.http.HttpMethod; import javax.annotation.Nullable; @@ -106,7 +106,7 @@ public class SystemSchema extends AbstractSchema private static final String TASKS_TABLE = "tasks"; private static final String SUPERVISOR_TABLE = "supervisors"; - private static final Function> + private static final Function> SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR = segment -> Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply( segment.getDataSegment().getDataSource()) @@ -117,6 +117,8 @@ public class SystemSchema extends AbstractSchema segment.getDataSource()) ); + private static final long REPLICATION_FACTOR_UNKNOWN = -1L; + /** * Booleans constants represented as long type, * where 1 = true and 0 = false to make it easy to count number of segments @@ -129,7 +131,6 @@ public class SystemSchema extends AbstractSchema private static final long IS_AVAILABLE_TRUE = 1L; private static final long IS_OVERSHADOWED_FALSE = 0L; private static final long IS_OVERSHADOWED_TRUE = 1L; - private static final long TARGET_NUM_REPLICAS_UNKNOWN = -1L; static final RowSignature SEGMENTS_SIGNATURE = RowSignature .builder() @@ -151,7 +152,7 @@ public class SystemSchema extends AbstractSchema .add("dimensions", ColumnType.STRING) .add("metrics", ColumnType.STRING) .add("last_compaction_state", ColumnType.STRING) - .add("target_num_replicas", ColumnType.LONG) + .add("replication_factor", ColumnType.LONG) .build(); static final RowSignature SERVERS_SIGNATURE = RowSignature @@ -290,7 +291,7 @@ public Enumerable scan(DataContext root) // Get published segments from metadata segment cache (if enabled in SQL planner config), else directly from // Coordinator. - final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); + final Iterator metadataStoreSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = Sets.newHashSetWithExpectedSize(druidSchema.cache().getTotalSegments()); @@ -329,7 +330,7 @@ public Enumerable scan(DataContext root) segment.getDimensions() == null ? null : jsonMapper.writeValueAsString(segment.getDimensions()), segment.getMetrics() == null ? null : jsonMapper.writeValueAsString(segment.getMetrics()), segment.getLastCompactionState() == null ? null : jsonMapper.writeValueAsString(segment.getLastCompactionState()), - val.getTotalTargetReplicants() == null ? TARGET_NUM_REPLICAS_UNKNOWN : (long) val.getTotalTargetReplicants() + val.getReplicationFactor() == null ? REPLICATION_FACTOR_UNKNOWN : (long) val.getReplicationFactor() }; } catch (JsonProcessingException e) { @@ -372,7 +373,7 @@ public Enumerable scan(DataContext root) val.getValue().getSegment().getDimensions() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()), val.getValue().getSegment().getMetrics() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()), null, // unpublished segments from realtime tasks will not be compacted yet - TARGET_NUM_REPLICAS_UNKNOWN + REPLICATION_FACTOR_UNKNOWN }; } catch (JsonProcessingException e) { @@ -388,8 +389,8 @@ public Enumerable scan(DataContext root) } - private Iterator getAuthorizedPublishedSegments( - Iterator it, + private Iterator getAuthorizedPublishedSegments( + Iterator it, DataContext root ) { @@ -398,7 +399,7 @@ private Iterator getAuthorizedPublishedSegments( "authenticationResult in dataContext" ); - final Iterable authorizedSegments = AuthorizationUtils + final Iterable authorizedSegments = AuthorizationUtils .filterAuthorizedResources( authenticationResult, () -> it, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 9ab99b728209..b7bd27d064a5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -99,7 +99,7 @@ import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentPlus; +import org.apache.druid.timeline.SegmentStatusInCluster; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; @@ -564,12 +564,12 @@ public void testGetTableMap() public void testSegmentsTable() throws Exception { final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, metadataView, new ObjectMapper(), authMapper); - final Set publishedSegments = new HashSet<>(Arrays.asList( - new SegmentPlus(publishedCompactedSegment1, true, 2), - new SegmentPlus(publishedCompactedSegment2, false, 0), - new SegmentPlus(publishedUncompactedSegment3, false, 2), - new SegmentPlus(segment1, true, 2), - new SegmentPlus(segment2, false, 0) + final Set publishedSegments = new HashSet<>(Arrays.asList( + new SegmentStatusInCluster(publishedCompactedSegment1, true, 2), + new SegmentStatusInCluster(publishedCompactedSegment2, false, 0), + new SegmentStatusInCluster(publishedUncompactedSegment3, false, 2), + new SegmentStatusInCluster(segment1, true, 2), + new SegmentStatusInCluster(segment2, false, 0) )); EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once(); From 8f6a47032d972c717e7436f9bf2ac7c23071e8c6 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 12 Jun 2023 08:31:18 +0530 Subject: [PATCH 04/18] Address review comments --- .../timeline/SegmentStatusInCluster.java | 2 +- .../timeline/SegmentStatusInClusterTest.java | 8 ++++---- .../sql/calcite/schema/SystemSchema.java | 4 ++-- .../sql/calcite/schema/SystemSchemaTest.java | 20 +++++++++---------- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java index c243e09a23b0..a673e6e39dcd 100644 --- a/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java +++ b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java @@ -36,7 +36,7 @@ public class SegmentStatusInCluster implements Comparable LOAD_SPEC = ImmutableMap.of("something", "or_other"); private static final boolean OVERSHADOWED = true; - private static final Integer TOTAL_TARGET_REPLICANTS = 2; + private static final Integer TOTAL_REPLICATION_FACTOR = 2; private static final int TEST_VERSION = 0x9; private static final SegmentStatusInCluster SEGMENT = createSegmentForTest(); @@ -76,7 +76,7 @@ private static SegmentStatusInCluster createSegmentForTest() 1 ); - return new SegmentStatusInCluster(dataSegment, OVERSHADOWED, TOTAL_TARGET_REPLICANTS); + return new SegmentStatusInCluster(dataSegment, OVERSHADOWED, TOTAL_REPLICATION_FACTOR); } @Test @@ -98,7 +98,7 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion")); Assert.assertEquals(1, objectMap.get("size")); Assert.assertEquals(OVERSHADOWED, objectMap.get("overshadowed")); - Assert.assertEquals(TOTAL_TARGET_REPLICANTS, objectMap.get("replicationFactor")); + Assert.assertEquals(TOTAL_REPLICATION_FACTOR, objectMap.get("replicationFactor")); final String json = MAPPER.writeValueAsString(SEGMENT); @@ -118,7 +118,7 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E Assert.assertEquals(dataSegment.getSize(), deserializedSegment.getSize()); Assert.assertEquals(dataSegment.getId(), deserializedSegment.getId()); Assert.assertEquals(OVERSHADOWED, deserializedSegment.isOvershadowed()); - Assert.assertEquals(TOTAL_TARGET_REPLICANTS, deserializedSegment.getReplicationFactor()); + Assert.assertEquals(TOTAL_REPLICATION_FACTOR, deserializedSegment.getReplicationFactor()); } // Previously, the implementation of SegmentStatusInCluster had @JsonCreator/@JsonProperty and @JsonUnwrapped diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 78ad23ebaddb..9e0c9b905384 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -107,7 +107,7 @@ public class SystemSchema extends AbstractSchema private static final String SUPERVISOR_TABLE = "supervisors"; private static final Function> - SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR = segment -> + SEGMENT_STATUS_IN_CLUSTER_RA_GENERATOR = segment -> Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply( segment.getDataSegment().getDataSource()) ); @@ -403,7 +403,7 @@ private Iterator getAuthorizedPublishedSegments( .filterAuthorizedResources( authenticationResult, () -> it, - SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR, + SEGMENT_STATUS_IN_CLUSTER_RA_GENERATOR, authorizerMapper ); return authorizedSegments.iterator(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index b7bd27d064a5..511a76b18add 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -599,7 +599,7 @@ public void testSegmentsTable() throws Exception 0L, //is_realtime 1L, //is_overshadowed null, //is_compacted - 2L // targetReplicas + 2L // replication_factor ); verifyRow( @@ -614,7 +614,7 @@ public void testSegmentsTable() throws Exception 0L, //is_realtime 0L, //is_overshadowed, null, //is_compacted - 0L // target_replicas + 0L // replication_factor ); //segment test3 is unpublished and has a NumberedShardSpec with partitionNum = 2 @@ -630,7 +630,7 @@ public void testSegmentsTable() throws Exception 0L, //is_realtime 0L, //is_overshadowed null, //is_compacted - -1L // target_replicas + -1L // replication_factor ); verifyRow( @@ -645,7 +645,7 @@ public void testSegmentsTable() throws Exception 1L, //is_realtime 0L, //is_overshadowed null, //is_compacted - -1L // target_replicas + -1L // replication_factor ); verifyRow( @@ -660,7 +660,7 @@ public void testSegmentsTable() throws Exception 1L, //is_realtime 0L, //is_overshadowed null, //is_compacted - -1L // target_replicas + -1L // replication_factor ); // wikipedia segments are published and unavailable, num_replicas is 0 @@ -677,7 +677,7 @@ public void testSegmentsTable() throws Exception 0L, //is_realtime 1L, //is_overshadowed expectedCompactionState, //is_compacted - 2L // target_replicas + 2L // replication_factor ); verifyRow( @@ -692,7 +692,7 @@ public void testSegmentsTable() throws Exception 0L, //is_realtime 0L, //is_overshadowed expectedCompactionState, //is_compacted - 0L // target_replicas + 0L // replication_factor ); verifyRow( @@ -707,7 +707,7 @@ public void testSegmentsTable() throws Exception 0L, //is_realtime 0L, //is_overshadowed null, //is_compacted - 2L // target_replicas + 2L // replication_factor ); // Verify value types. @@ -726,7 +726,7 @@ private void verifyRow( long isRealtime, long isOvershadowed, CompactionState compactionState, - long targetReplicas + long replicationFactor ) throws Exception { Assert.assertEquals(segmentId, row[0].toString()); @@ -749,7 +749,7 @@ private void verifyRow( } else { Assert.assertEquals(mapper.writeValueAsString(compactionState), row[17]); } - Assert.assertEquals(targetReplicas, row[18]); + Assert.assertEquals(replicationFactor, row[18]); } @Test From 10d14ecb7b677bf445ca0bc09de2db38157fd4ab Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 12 Jun 2023 10:21:47 +0530 Subject: [PATCH 05/18] Update comments --- .../org/apache/druid/timeline/SegmentStatusInCluster.java | 2 +- .../apache/druid/timeline/SegmentStatusInClusterTest.java | 8 ++++---- .../apache/druid/server/coordinator/DruidCoordinator.java | 2 +- .../org/apache/druid/sql/calcite/schema/SystemSchema.java | 4 +++- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java index a673e6e39dcd..206e7ea70ce5 100644 --- a/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java +++ b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java @@ -27,7 +27,7 @@ import java.util.Objects; /** - * DataSegment object plus the overshadowed and target number of replicants for the segment. An immutable object. + * DataSegment object plus the overshadowed and target replication factor for the segment. An immutable object. *

* SegmentStatusInCluster's {@link #compareTo} method considers only the {@link SegmentId} * of the DataSegment object. diff --git a/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java index 6b4b90884add..f13ab29e203d 100644 --- a/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java +++ b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java @@ -48,7 +48,7 @@ public class SegmentStatusInClusterTest private static final Interval INTERVAL = Intervals.of("2011-10-01/2011-10-02"); private static final ImmutableMap LOAD_SPEC = ImmutableMap.of("something", "or_other"); private static final boolean OVERSHADOWED = true; - private static final Integer TOTAL_REPLICATION_FACTOR = 2; + private static final Integer REPLICATION_FACTOR = 2; private static final int TEST_VERSION = 0x9; private static final SegmentStatusInCluster SEGMENT = createSegmentForTest(); @@ -76,7 +76,7 @@ private static SegmentStatusInCluster createSegmentForTest() 1 ); - return new SegmentStatusInCluster(dataSegment, OVERSHADOWED, TOTAL_REPLICATION_FACTOR); + return new SegmentStatusInCluster(dataSegment, OVERSHADOWED, REPLICATION_FACTOR); } @Test @@ -98,7 +98,7 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion")); Assert.assertEquals(1, objectMap.get("size")); Assert.assertEquals(OVERSHADOWED, objectMap.get("overshadowed")); - Assert.assertEquals(TOTAL_REPLICATION_FACTOR, objectMap.get("replicationFactor")); + Assert.assertEquals(REPLICATION_FACTOR, objectMap.get("replicationFactor")); final String json = MAPPER.writeValueAsString(SEGMENT); @@ -118,7 +118,7 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E Assert.assertEquals(dataSegment.getSize(), deserializedSegment.getSize()); Assert.assertEquals(dataSegment.getId(), deserializedSegment.getId()); Assert.assertEquals(OVERSHADOWED, deserializedSegment.isOvershadowed()); - Assert.assertEquals(TOTAL_REPLICATION_FACTOR, deserializedSegment.getReplicationFactor()); + Assert.assertEquals(REPLICATION_FACTOR, deserializedSegment.getReplicationFactor()); } // Previously, the implementation of SegmentStatusInCluster had @JsonCreator/@JsonProperty and @JsonUnwrapped diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index bbed09147585..456256463fc1 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -156,7 +156,7 @@ public class DruidCoordinator private volatile SegmentReplicantLookup segmentReplicantLookup = null; /** - * Contains a map of segmentId to total replciation factor across all tiers. This map is refreshed when load rules are + * Contains a map of segmentId to total replication factor across all tiers. This map is refreshed when load rules are * evaluated. It is used by {@link DruidCoordinator} to supply this value to * {@link org.apache.druid.server.http.MetadataResource}. */ diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 9e0c9b905384..2c60e9eda1ce 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -330,6 +330,8 @@ public Enumerable scan(DataContext root) segment.getDimensions() == null ? null : jsonMapper.writeValueAsString(segment.getDimensions()), segment.getMetrics() == null ? null : jsonMapper.writeValueAsString(segment.getMetrics()), segment.getLastCompactionState() == null ? null : jsonMapper.writeValueAsString(segment.getLastCompactionState()), + // If the value is null, the load rules might have not evaluated yet, and we don't know the replication factor. + // This should be automatically updated in the next refesh with Coordinator. val.getReplicationFactor() == null ? REPLICATION_FACTOR_UNKNOWN : (long) val.getReplicationFactor() }; } @@ -373,7 +375,7 @@ public Enumerable scan(DataContext root) val.getValue().getSegment().getDimensions() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()), val.getValue().getSegment().getMetrics() == null ? null : jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()), null, // unpublished segments from realtime tasks will not be compacted yet - REPLICATION_FACTOR_UNKNOWN + REPLICATION_FACTOR_UNKNOWN // If the segment is unpublished, we won't have this information yet. }; } catch (JsonProcessingException e) { From c1e98f8f9bcc98adc70d365e83f96003f2f8f3fc Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 12 Jun 2023 10:27:35 +0530 Subject: [PATCH 06/18] Update comments --- .../org/apache/druid/timeline/SegmentStatusInCluster.java | 4 ++-- .../druid/server/coordinator/SegmentReplicantLookup.java | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java index 206e7ea70ce5..1c4c9f0e3717 100644 --- a/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java +++ b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java @@ -27,7 +27,7 @@ import java.util.Objects; /** - * DataSegment object plus the overshadowed and target replication factor for the segment. An immutable object. + * DataSegment object plus the overshadowed and replication factor for the segment. An immutable object. *

* SegmentStatusInCluster's {@link #compareTo} method considers only the {@link SegmentId} * of the DataSegment object. @@ -36,7 +36,7 @@ public class SegmentStatusInCluster implements Comparable Date: Mon, 12 Jun 2023 11:23:34 +0530 Subject: [PATCH 07/18] Update docs --- docs/querying/sql-metadata-tables.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/querying/sql-metadata-tables.md b/docs/querying/sql-metadata-tables.md index 3d6093659560..bfa73f49d117 100644 --- a/docs/querying/sql-metadata-tables.md +++ b/docs/querying/sql-metadata-tables.md @@ -137,6 +137,7 @@ Segments table provides details on all Druid segments, whether they are publishe |dimensions|VARCHAR|JSON-serialized form of the segment dimensions| |metrics|VARCHAR|JSON-serialized form of the segment metrics| |last_compaction_state|VARCHAR|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.| +|replication factor|BIGINT|Total number of replicants across all tiers for the segment. If this value is 0, the segment is not assigned to any historical and will never be loaded. If this value is -1, the load rules might not have evaluated yet for the segment and the replication factor is currently unavailable.| For example, to retrieve all currently active segments for datasource "wikipedia", use the query: From 9c9b9755cf6486f52c4668044e360fb59cd1e6f7 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 12 Jun 2023 22:05:21 +0530 Subject: [PATCH 08/18] Update doc with correct column name --- docs/querying/sql-metadata-tables.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/querying/sql-metadata-tables.md b/docs/querying/sql-metadata-tables.md index bfa73f49d117..e301eee4443f 100644 --- a/docs/querying/sql-metadata-tables.md +++ b/docs/querying/sql-metadata-tables.md @@ -137,7 +137,7 @@ Segments table provides details on all Druid segments, whether they are publishe |dimensions|VARCHAR|JSON-serialized form of the segment dimensions| |metrics|VARCHAR|JSON-serialized form of the segment metrics| |last_compaction_state|VARCHAR|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.| -|replication factor|BIGINT|Total number of replicants across all tiers for the segment. If this value is 0, the segment is not assigned to any historical and will never be loaded. If this value is -1, the load rules might not have evaluated yet for the segment and the replication factor is currently unavailable.| +|replication_factor|BIGINT|Total number of replicants across all tiers for the segment. If this value is 0, the segment is not assigned to any historical and will never be loaded. If this value is -1, the load rules might not have evaluated yet for the segment and the replication factor is currently unavailable.| For example, to retrieve all currently active segments for datasource "wikipedia", use the query: From e7e03500cf6733657fda45fc9d4a5e572ce358b6 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 13 Jun 2023 08:44:30 +0530 Subject: [PATCH 09/18] Fix ITs --- .../test/resources/results/auth_test_sys_schema_segments.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json index 1cb97572db5f..1ce7b44bc617 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json @@ -17,6 +17,7 @@ "shard_spec": "{\"type\":\"none\"}", "dimensions": "[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]", "metrics": "[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"]", - "last_compaction_state": null + "last_compaction_state": null, + "replication_factor": 2 } ] From 130e4207ea0310dae4f748eba150ed2b2687f1e1 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 13 Jun 2023 09:17:39 +0530 Subject: [PATCH 10/18] Update spellcheck --- website/.spelling | 1 + 1 file changed, 1 insertion(+) diff --git a/website/.spelling b/website/.spelling index 59e04904ed48..ccba11c6ca7f 100644 --- a/website/.spelling +++ b/website/.spelling @@ -639,6 +639,7 @@ num_segments partition_num plaintext_port queue_insertion_time +replication_factor runner_status segment_id server_type From 0bda368b705db3dda6940e15bc339a4d55e81eba Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 14 Jun 2023 09:07:10 +0530 Subject: [PATCH 11/18] Rename variables and update comments --- docs/querying/sql-metadata-tables.md | 2 +- .../apache/druid/timeline/SegmentStatusInCluster.java | 11 ++++++++--- .../druid/timeline/SegmentStatusInClusterTest.java | 5 +---- .../server/coordinator/SegmentReplicantLookup.java | 7 +++---- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/docs/querying/sql-metadata-tables.md b/docs/querying/sql-metadata-tables.md index e301eee4443f..79dcb533c2d3 100644 --- a/docs/querying/sql-metadata-tables.md +++ b/docs/querying/sql-metadata-tables.md @@ -137,7 +137,7 @@ Segments table provides details on all Druid segments, whether they are publishe |dimensions|VARCHAR|JSON-serialized form of the segment dimensions| |metrics|VARCHAR|JSON-serialized form of the segment metrics| |last_compaction_state|VARCHAR|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.| -|replication_factor|BIGINT|Total number of replicants across all tiers for the segment. If this value is 0, the segment is not assigned to any historical and will never be loaded. If this value is -1, the load rules might not have evaluated yet for the segment and the replication factor is currently unavailable.| +|replication_factor|BIGINT|Total number of replicas of the segment that are required to be loaded across all historical tiers, based on the load rule that currently applies to this segment. If this value is 0, the segment is not assigned to any historical and will not be loaded. This value is -1 if load rules for the segment have not been evaluated yet or if the segment is overshadowed and will soon be marked as unused.| For example, to retrieve all currently active segments for datasource "wikipedia", use the query: diff --git a/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java index 1c4c9f0e3717..4e5577f76039 100644 --- a/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java +++ b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java @@ -27,10 +27,15 @@ import java.util.Objects; /** - * DataSegment object plus the overshadowed and replication factor for the segment. An immutable object. + * This class represents the current state of a segment in the cluster and encapsulates the following: + *
    + *
  • the {@code DataSegment} object
  • + *
  • overshadowed status of the segment
  • + *
  • replication factor of the segment
  • + *
*

- * SegmentStatusInCluster's {@link #compareTo} method considers only the {@link SegmentId} - * of the DataSegment object. + * Objects of this class are used to sync the state of segments from the Coordinator to different services, typically the Broker. + * The {@link #compareTo} method considers only the {@link SegmentId}. */ public class SegmentStatusInCluster implements Comparable { diff --git a/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java index f13ab29e203d..a7690faa90c4 100644 --- a/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java +++ b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java @@ -205,10 +205,7 @@ public boolean equals(Object o) return false; } TestSegment that = (TestSegment) o; - return overshadowed == that.overshadowed && Objects.equals( - replicationFactor, - that.replicationFactor - ); + return overshadowed == that.overshadowed && Objects.equals(replicationFactor, that.replicationFactor); } @Override diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java index 01ecc38bda11..7ec7abe02a9e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java @@ -31,7 +31,6 @@ import java.util.HashMap; import java.util.Map; import java.util.SortedSet; -import java.util.concurrent.ConcurrentHashMap; /** * A lookup for the number of replicants of a given segment for a certain tier. @@ -81,7 +80,7 @@ public static SegmentReplicantLookup make(DruidCluster cluster, boolean replicat private final Table segmentsInCluster; private final Table loadingSegments; - private final ConcurrentHashMap replicationFactorMap = new ConcurrentHashMap<>(); + private final Map segmentIdToReplicationFactor = new HashMap<>(); private final DruidCluster cluster; private SegmentReplicantLookup( @@ -121,12 +120,12 @@ public int getLoadedReplicants(SegmentId segmentId, String tier) // This would be revamped in https://github.com/apache/druid/pull/13197 public void setReplicationFactor(SegmentId segmentId, Integer requiredReplicas) { - replicationFactorMap.put(segmentId, requiredReplicas); + segmentIdToReplicationFactor.put(segmentId, requiredReplicas); } public Map getSegmentIdToReplicationFactor() { - return ImmutableMap.copyOf(replicationFactorMap); + return ImmutableMap.copyOf(segmentIdToReplicationFactor); } private int getLoadingReplicants(SegmentId segmentId, String tier) From 74473f20decac50b90e0896113378d0ea16cf437 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 14 Jun 2023 15:30:36 +0530 Subject: [PATCH 12/18] Cache responses in the broker to prevent incorrect values if coordinator fails --- docs/querying/sql-metadata-tables.md | 2 +- .../druid/server/http/MetadataResource.java | 23 +++++++++++++------ .../calcite/schema/MetadataSegmentView.java | 15 +++++++++++- 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/docs/querying/sql-metadata-tables.md b/docs/querying/sql-metadata-tables.md index 79dcb533c2d3..dc948cc51f5f 100644 --- a/docs/querying/sql-metadata-tables.md +++ b/docs/querying/sql-metadata-tables.md @@ -137,7 +137,7 @@ Segments table provides details on all Druid segments, whether they are publishe |dimensions|VARCHAR|JSON-serialized form of the segment dimensions| |metrics|VARCHAR|JSON-serialized form of the segment metrics| |last_compaction_state|VARCHAR|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.| -|replication_factor|BIGINT|Total number of replicas of the segment that are required to be loaded across all historical tiers, based on the load rule that currently applies to this segment. If this value is 0, the segment is not assigned to any historical and will not be loaded. This value is -1 if load rules for the segment have not been evaluated yet or if the segment is overshadowed and will soon be marked as unused.| +|replication_factor|BIGINT|Total number of replicas of the segment that are required to be loaded across all historical tiers, based on the load rule that currently applies to this segment. If this value is 0, the segment is not assigned to any historical and will not be loaded or is about to be unloaded if it is currently loaded. This value is -1 if load rules for the segment have not been evaluated yet.| For example, to retrieve all currently active segments for datasource "wikipedia", use the query: diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 9f2ca8b37f62..0583cc87a162 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -189,13 +189,22 @@ private Response getAllUsedSegmentsWithAdditionalDetails( final Set overshadowedSegments = dataSourcesSnapshot.getOvershadowedSegments(); final Stream segmentStatus = usedSegments - .map(segment -> - new SegmentStatusInCluster( - segment, - overshadowedSegments.contains(segment), - coordinator.getReplicationFactorForSegment(segment.getId()) - ) - ); + .map(segment -> { + boolean isOvershadowed = overshadowedSegments.contains(segment); + Integer replicationFactor; + if (isOvershadowed) { + // If the segment is overshadowed, the replication factor won't be present in the coordinator, but we know + // that it should be 0 as we will unload it soon. + replicationFactor = 0; + } else { + replicationFactor = coordinator.getReplicationFactorForSegment(segment.getId()); + } + return new SegmentStatusInCluster( + segment, + isOvershadowed, + replicationFactor + ); + }); final Function> raGenerator = segment -> Collections .singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource())); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 7b8a0b3e9dc7..679cffe76273 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableSortedSet; import com.google.common.util.concurrent.Uninterruptibles; import com.google.inject.Inject; +import org.apache.curator.shaded.com.google.common.cache.Cache; +import org.apache.curator.shaded.com.google.common.cache.CacheBuilder; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.DataSegmentInterner; import org.apache.druid.client.JsonParserIterator; @@ -40,6 +42,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.sql.calcite.planner.SegmentMetadataCacheConfig; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentStatusInCluster; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -77,6 +80,7 @@ public class MetadataSegmentView */ @MonotonicNonNull private volatile ImmutableSortedSet publishedSegments = null; + private final Cache segmentIdToReplicationFactor; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); @@ -97,6 +101,9 @@ public MetadataSegmentView( this.isCacheEnabled = config.isMetadataSegmentCacheEnable(); this.pollPeriodInMS = config.getMetadataSegmentPollPeriod(); this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); + this.segmentIdToReplicationFactor = CacheBuilder.newBuilder() + .expireAfterWrite(3, TimeUnit.MINUTES) + .build(); } @LifecycleStart @@ -143,10 +150,16 @@ private void poll() while (metadataSegments.hasNext()) { final SegmentStatusInCluster segment = metadataSegments.next(); final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment()); + Integer replicationFactor = segment.getReplicationFactor(); + if (segment.getReplicationFactor() == null) { + replicationFactor = segmentIdToReplicationFactor.getIfPresent(segment.getDataSegment().getId()); + } else { + segmentIdToReplicationFactor.put(segment.getDataSegment().getId(), segment.getReplicationFactor()); + } final SegmentStatusInCluster segmentStatusInCluster = new SegmentStatusInCluster( interned, segment.isOvershadowed(), - segment.getReplicationFactor() + replicationFactor ); builder.add(segmentStatusInCluster); } From 7c43b8b98a0f41f879b707753e19c5cad0bb872c Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 15 Jun 2023 19:21:54 +0530 Subject: [PATCH 13/18] Add tests --- docs/querying/sql-metadata-tables.md | 2 +- .../server/coordinator/DruidCoordinator.java | 2 +- .../coordinator/SegmentReplicantLookup.java | 7 +- .../server/http/MetadataResourceTest.java | 163 ++++++++++++++++++ .../calcite/schema/MetadataSegmentView.java | 8 +- 5 files changed, 175 insertions(+), 7 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java diff --git a/docs/querying/sql-metadata-tables.md b/docs/querying/sql-metadata-tables.md index dc948cc51f5f..0275bd984d61 100644 --- a/docs/querying/sql-metadata-tables.md +++ b/docs/querying/sql-metadata-tables.md @@ -137,7 +137,7 @@ Segments table provides details on all Druid segments, whether they are publishe |dimensions|VARCHAR|JSON-serialized form of the segment dimensions| |metrics|VARCHAR|JSON-serialized form of the segment metrics| |last_compaction_state|VARCHAR|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.| -|replication_factor|BIGINT|Total number of replicas of the segment that are required to be loaded across all historical tiers, based on the load rule that currently applies to this segment. If this value is 0, the segment is not assigned to any historical and will not be loaded or is about to be unloaded if it is currently loaded. This value is -1 if load rules for the segment have not been evaluated yet.| +|replication_factor|BIGINT|Total number of replicas of the segment that are required to be loaded across all historical tiers, based on the load rule that currently applies to this segment. If this value is 0, the segment is not assigned to any historical and will not be loaded. This value is -1 if load rules for the segment have not been evaluated yet.| For example, to retrieve all currently active segments for datasource "wikipedia", use the query: diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 456256463fc1..b7de2cbf3af8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -160,7 +160,7 @@ public class DruidCoordinator * evaluated. It is used by {@link DruidCoordinator} to supply this value to * {@link org.apache.druid.server.http.MetadataResource}. */ - private volatile Map segmentIdToReplicationFactor = null; + private volatile Object2IntMap segmentIdToReplicationFactor = null; private volatile DruidCluster cluster = null; private int cachedBalancerThreadNumber; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java index 7ec7abe02a9e..96e4fe812760 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java @@ -20,8 +20,9 @@ package org.apache.druid.server.coordinator; import com.google.common.collect.HashBasedTable; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Table; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.client.ImmutableDruidServer; @@ -123,9 +124,9 @@ public void setReplicationFactor(SegmentId segmentId, Integer requiredReplicas) segmentIdToReplicationFactor.put(segmentId, requiredReplicas); } - public Map getSegmentIdToReplicationFactor() + public Object2IntMap getSegmentIdToReplicationFactor() { - return ImmutableMap.copyOf(segmentIdToReplicationFactor); + return new Object2IntOpenHashMap<>(segmentIdToReplicationFactor); } private int getLoadingReplicants(SegmentId segmentId, String tier) diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java new file mode 100644 index 000000000000..e8dd05c8ed9a --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java @@ -0,0 +1,163 @@ +package org.apache.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.client.DataSourcesSnapshot; +import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.metadata.SegmentsMetadataManager; +import org.apache.druid.server.coordinator.DruidCoordinator; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentStatusInCluster; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.Response; +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +public class MetadataResourceTest +{ + private static final String DATASOURCE1 = "datasource1"; + private static final String DATASOURCE2 = "datasource2"; + + private MetadataResource metadataResource; + + private SegmentsMetadataManager segmentsMetadataManager; + private DruidCoordinator coordinator; + private HttpServletRequest request; + + private final DataSegment dataSegment1 = new DataSegment( + DATASOURCE1, + Intervals.of("2010-01-01/P1D"), + "v0", + null, + null, + null, + null, + 0x9, + 10 + ); + + private final DataSegment dataSegment2 = new DataSegment( + DATASOURCE1, + Intervals.of("2010-01-22/P1D"), + "v0", + null, + null, + null, + null, + 0x9, + 20 + ); + + private final DataSegment dataSegment3 = new DataSegment( + DATASOURCE2, + Intervals.of("2010-01-01/P1M"), + "v0", + null, + null, + null, + null, + 0x9, + 30 + ); + + private final DataSegment dataSegment4 = new DataSegment( + DATASOURCE2, + Intervals.of("2010-01-02/P1D"), + "v0", + null, + null, + null, + null, + 0x9, + 35 + ); + + @Before + public void setUp() throws Exception + { + // Create mock request + request = mock(HttpServletRequest.class); + doReturn(mock(AuthenticationResult.class)).when(request).getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT); + + // Mock SegmentsMetadataManager + segmentsMetadataManager = mock(SegmentsMetadataManager.class); + ImmutableDruidDataSource druidDataSource1 = new ImmutableDruidDataSource( + DATASOURCE1, + ImmutableMap.of(), + ImmutableList.of( + dataSegment1, + dataSegment2 + ) + ); + + ImmutableDruidDataSource druidDataSource2 = new ImmutableDruidDataSource( + DATASOURCE1, + ImmutableMap.of(), + ImmutableList.of( + dataSegment3, + dataSegment4 + ) + ); + + // Mock segments from cache and coordinator + DataSourcesSnapshot dataSourcesSnapshot = mock(DataSourcesSnapshot.class); + doReturn(dataSourcesSnapshot).when(segmentsMetadataManager).getSnapshotOfDataSourcesWithAllUsedSegments(); + + doReturn(ImmutableList.of(druidDataSource1, druidDataSource2)).when(dataSourcesSnapshot).getDataSourcesWithAllUsedSegments(); + // Segment 4 is overshadowed + + // Mock Coordinator + coordinator = mock(DruidCoordinator.class); + // Segment 1: Replication factor 2, not overshadowed + doReturn(2).when(coordinator).getReplicationFactorForSegment(dataSegment1.getId()); + // Segment 2: Replication factor null, not overshadowed + doReturn(null).when(coordinator).getReplicationFactorForSegment(dataSegment2.getId()); + // Segment 3: Replication factor 1, not overshadowed + doReturn(1).when(coordinator).getReplicationFactorForSegment(dataSegment3.getId()); + // Segment 4: Replication factor 1, overshadowed + doReturn(1).when(coordinator).getReplicationFactorForSegment(dataSegment4.getId()); + doReturn(ImmutableSet.of(dataSegment4)).when(dataSourcesSnapshot).getOvershadowedSegments(); + + metadataResource = new MetadataResource(segmentsMetadataManager, mock(IndexerMetadataStorageCoordinator.class), AuthTestUtils.TEST_AUTHORIZER_MAPPER, coordinator, new ObjectMapper()); + } + + @Test + public void testGetAllSegmentsWithOvershadowedStatus() + { + Response response = metadataResource.getAllUsedSegments( + request, + null, + "includeOvershadowedStatus" + ); + + List resultList = materializeResponse(response); + Assert.assertEquals(resultList.size(), 4); + Assert.assertEquals(new SegmentStatusInCluster(dataSegment1, false, 2), resultList.get(0)); + Assert.assertEquals(new SegmentStatusInCluster(dataSegment2, false, null), resultList.get(1)); + Assert.assertEquals(new SegmentStatusInCluster(dataSegment3, false, 1), resultList.get(2)); + // Replication factor should be 0 as the segment is overshadowed + Assert.assertEquals(new SegmentStatusInCluster(dataSegment4, true, 0), resultList.get(3)); + } + + private List materializeResponse(Response response) + { + Iterable resultIterator = (Iterable ) response.getEntity(); + List segmentStatusInClusters = new ArrayList<>(); + resultIterator.forEach(segmentStatusInClusters::add); + return segmentStatusInClusters; + } +} \ No newline at end of file diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 679cffe76273..d09d45db7a01 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -80,6 +80,10 @@ public class MetadataSegmentView */ @MonotonicNonNull private volatile ImmutableSortedSet publishedSegments = null; + /** + * Caches segmentId vs replication factor. In case the coordinator restarts, this is used to refer to previous values + * to prevent randomly flapping to null. + */ private final Cache segmentIdToReplicationFactor; private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; @@ -102,7 +106,7 @@ public MetadataSegmentView( this.pollPeriodInMS = config.getMetadataSegmentPollPeriod(); this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); this.segmentIdToReplicationFactor = CacheBuilder.newBuilder() - .expireAfterWrite(3, TimeUnit.MINUTES) + .expireAfterWrite(10, TimeUnit.MINUTES) .build(); } @@ -151,7 +155,7 @@ private void poll() final SegmentStatusInCluster segment = metadataSegments.next(); final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment()); Integer replicationFactor = segment.getReplicationFactor(); - if (segment.getReplicationFactor() == null) { + if (replicationFactor == null) { replicationFactor = segmentIdToReplicationFactor.getIfPresent(segment.getDataSegment().getId()); } else { segmentIdToReplicationFactor.put(segment.getDataSegment().getId(), segment.getReplicationFactor()); From 4822492c41118d1ebeb8dc267dcbfd87e6e0c1c5 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Thu, 15 Jun 2023 21:43:14 +0530 Subject: [PATCH 14/18] Fix checkstyle --- .../server/http/MetadataResourceTest.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java index e8dd05c8ed9a..1763eb03bc5b 100644 --- a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.server.http; import com.fasterxml.jackson.databind.ObjectMapper; From 94e02042c182c3fb65710256a38b9c7d10dfe2d3 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 16 Jun 2023 08:10:43 +0530 Subject: [PATCH 15/18] Resolve checkstyle failures --- .../druid/server/coordinator/DruidCoordinator.java | 2 +- .../druid/server/http/MetadataResourceTest.java | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index b7de2cbf3af8..e37ab05d6dbc 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -827,7 +827,7 @@ private List makeCompactSegmentsDuty() @Nullable public Integer getReplicationFactorForSegment(SegmentId segmentId) { - return segmentIdToReplicationFactor == null ? null : segmentIdToReplicationFactor.get(segmentId); + return segmentIdToReplicationFactor == null ? null : segmentIdToReplicationFactor.getInt(segmentId); } @VisibleForTesting diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java index 1763eb03bc5b..10662a79dedd 100644 --- a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java @@ -151,7 +151,7 @@ public void setUp() throws Exception doReturn(1).when(coordinator).getReplicationFactorForSegment(dataSegment4.getId()); doReturn(ImmutableSet.of(dataSegment4)).when(dataSourcesSnapshot).getOvershadowedSegments(); - metadataResource = new MetadataResource(segmentsMetadataManager, mock(IndexerMetadataStorageCoordinator.class), AuthTestUtils.TEST_AUTHORIZER_MAPPER, coordinator, new ObjectMapper()); + metadataResource = new MetadataResource(segmentsMetadataManager, mock(IndexerMetadataStorageCoordinator.class), AuthTestUtils.TEST_AUTHORIZER_MAPPER, coordinator, new ObjectMapper()); } @Test @@ -165,18 +165,18 @@ public void testGetAllSegmentsWithOvershadowedStatus() List resultList = materializeResponse(response); Assert.assertEquals(resultList.size(), 4); - Assert.assertEquals(new SegmentStatusInCluster(dataSegment1, false, 2), resultList.get(0)); - Assert.assertEquals(new SegmentStatusInCluster(dataSegment2, false, null), resultList.get(1)); - Assert.assertEquals(new SegmentStatusInCluster(dataSegment3, false, 1), resultList.get(2)); + Assert.assertEquals(new SegmentStatusInCluster(dataSegment1, false, 2), resultList.get(0)); + Assert.assertEquals(new SegmentStatusInCluster(dataSegment2, false, null), resultList.get(1)); + Assert.assertEquals(new SegmentStatusInCluster(dataSegment3, false, 1), resultList.get(2)); // Replication factor should be 0 as the segment is overshadowed - Assert.assertEquals(new SegmentStatusInCluster(dataSegment4, true, 0), resultList.get(3)); + Assert.assertEquals(new SegmentStatusInCluster(dataSegment4, true, 0), resultList.get(3)); } private List materializeResponse(Response response) { - Iterable resultIterator = (Iterable ) response.getEntity(); + Iterable resultIterator = (Iterable) response.getEntity(); List segmentStatusInClusters = new ArrayList<>(); resultIterator.forEach(segmentStatusInClusters::add); return segmentStatusInClusters; } -} \ No newline at end of file +} From 59fe7b48037249e5ccf446ecd5941a4da2f3feb1 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 16 Jun 2023 11:26:09 +0530 Subject: [PATCH 16/18] Resolve missing dependency build failure --- .../apache/druid/sql/calcite/schema/MetadataSegmentView.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index d09d45db7a01..5ff2e2c5fd84 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -22,11 +22,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableSortedSet; import com.google.common.util.concurrent.Uninterruptibles; import com.google.inject.Inject; -import org.apache.curator.shaded.com.google.common.cache.Cache; -import org.apache.curator.shaded.com.google.common.cache.CacheBuilder; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.DataSegmentInterner; import org.apache.druid.client.JsonParserIterator; From ac977a1a69db930b2a1dd8c1f4df03a351a91d79 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Fri, 16 Jun 2023 14:59:07 +0530 Subject: [PATCH 17/18] Address review comments --- .../java/org/apache/druid/server/http/MetadataResourceTest.java | 2 +- .../apache/druid/sql/calcite/schema/MetadataSegmentView.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java index 10662a79dedd..32c1f6cd3ab6 100644 --- a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java @@ -106,7 +106,7 @@ public class MetadataResourceTest ); @Before - public void setUp() throws Exception + public void setUp() { // Create mock request request = mock(HttpServletRequest.class); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 5ff2e2c5fd84..73c33c58861f 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -106,7 +106,7 @@ public MetadataSegmentView( this.pollPeriodInMS = config.getMetadataSegmentPollPeriod(); this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); this.segmentIdToReplicationFactor = CacheBuilder.newBuilder() - .expireAfterWrite(10, TimeUnit.MINUTES) + .expireAfterAccess(10, TimeUnit.MINUTES) .build(); } From 2d55219c4467cca5599cc85443e55d95e86f33ca Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Sat, 17 Jun 2023 16:45:43 +0530 Subject: [PATCH 18/18] Update comments --- .../apache/druid/server/http/MetadataResourceTest.java | 10 ---------- .../druid/sql/calcite/schema/MetadataSegmentView.java | 4 ++-- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java index 32c1f6cd3ab6..f87b65d606d6 100644 --- a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java @@ -108,11 +108,9 @@ public class MetadataResourceTest @Before public void setUp() { - // Create mock request request = mock(HttpServletRequest.class); doReturn(mock(AuthenticationResult.class)).when(request).getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT); - // Mock SegmentsMetadataManager segmentsMetadataManager = mock(SegmentsMetadataManager.class); ImmutableDruidDataSource druidDataSource1 = new ImmutableDruidDataSource( DATASOURCE1, @@ -132,22 +130,14 @@ public void setUp() ) ); - // Mock segments from cache and coordinator DataSourcesSnapshot dataSourcesSnapshot = mock(DataSourcesSnapshot.class); doReturn(dataSourcesSnapshot).when(segmentsMetadataManager).getSnapshotOfDataSourcesWithAllUsedSegments(); - doReturn(ImmutableList.of(druidDataSource1, druidDataSource2)).when(dataSourcesSnapshot).getDataSourcesWithAllUsedSegments(); - // Segment 4 is overshadowed - // Mock Coordinator coordinator = mock(DruidCoordinator.class); - // Segment 1: Replication factor 2, not overshadowed doReturn(2).when(coordinator).getReplicationFactorForSegment(dataSegment1.getId()); - // Segment 2: Replication factor null, not overshadowed doReturn(null).when(coordinator).getReplicationFactorForSegment(dataSegment2.getId()); - // Segment 3: Replication factor 1, not overshadowed doReturn(1).when(coordinator).getReplicationFactorForSegment(dataSegment3.getId()); - // Segment 4: Replication factor 1, overshadowed doReturn(1).when(coordinator).getReplicationFactorForSegment(dataSegment4.getId()); doReturn(ImmutableSet.of(dataSegment4)).when(dataSourcesSnapshot).getOvershadowedSegments(); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 73c33c58861f..c3a1ab48f5ab 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -81,8 +81,8 @@ public class MetadataSegmentView @MonotonicNonNull private volatile ImmutableSortedSet publishedSegments = null; /** - * Caches segmentId vs replication factor. In case the coordinator restarts, this is used to refer to previous values - * to prevent randomly flapping to null. + * Caches the replication factor for segment IDs. In case of coordinator restarts or leadership re-elections, the coordinator API returns `null` replication factor until load rules are evaluated. + * The cache can be used during these periods to continue serving the previously fetched values. */ private final Cache segmentIdToReplicationFactor; private final ScheduledExecutorService scheduledExec;