diff --git a/docs/api-reference/api-reference.md b/docs/api-reference/api-reference.md
index 9b762c08183a..2a6c36deae13 100644
--- a/docs/api-reference/api-reference.md
+++ b/docs/api-reference/api-reference.md
@@ -173,11 +173,11 @@ Returns a list of all segments for one or more specific datasources enabled in t
`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus`
-Returns a list of all segments for each datasource with the full segment metadata and an extra field `overshadowed`.
+Returns a list of all segments for each datasource with the full segment metadata and extra fields `overshadowed` and `replicationFactor`.
`GET /druid/coordinator/v1/metadata/segments?includeOvershadowedStatus&datasources={dataSourceName1}&datasources={dataSourceName2}`
-Returns a list of all segments for one or more specific datasources with the full segment metadata and an extra field `overshadowed`.
+Returns a list of all segments for one or more specific datasources with the full segment metadata and extra fields `overshadowed` and `replicationFactor`.
`GET /druid/coordinator/v1/metadata/datasources`
diff --git a/docs/querying/sql-metadata-tables.md b/docs/querying/sql-metadata-tables.md
index 3d6093659560..0275bd984d61 100644
--- a/docs/querying/sql-metadata-tables.md
+++ b/docs/querying/sql-metadata-tables.md
@@ -137,6 +137,7 @@ Segments table provides details on all Druid segments, whether they are publishe
|dimensions|VARCHAR|JSON-serialized form of the segment dimensions|
|metrics|VARCHAR|JSON-serialized form of the segment metrics|
|last_compaction_state|VARCHAR|JSON-serialized form of the compaction task's config (compaction task which created this segment). May be null if segment was not created by compaction task.|
+|replication_factor|BIGINT|Total number of replicas of the segment that are required to be loaded across all historical tiers, based on the load rule that currently applies to this segment. If this value is 0, the segment is not assigned to any historical and will not be loaded. This value is -1 if load rules for the segment have not been evaluated yet.|
For example, to retrieve all currently active segments for datasource "wikipedia", use the query:
diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
index 1cb97572db5f..1ce7b44bc617 100644
--- a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
+++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
@@ -17,6 +17,7 @@
"shard_spec": "{\"type\":\"none\"}",
"dimensions": "[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]",
"metrics": "[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"]",
- "last_compaction_state": null
+ "last_compaction_state": null,
+ "replication_factor": 2
}
]
diff --git a/processing/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java
similarity index 52%
rename from processing/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java
rename to processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java
index 04c8bf4378b6..4e5577f76039 100644
--- a/processing/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java
+++ b/processing/src/main/java/org/apache/druid/timeline/SegmentStatusInCluster.java
@@ -23,39 +23,55 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonUnwrapped;
+import javax.annotation.Nullable;
+import java.util.Objects;
+
/**
- * DataSegment object plus the overshadowed status for the segment. An immutable object.
- *
- * SegmentWithOvershadowedStatus's {@link #compareTo} method considers only the {@link SegmentId}
- * of the DataSegment object.
+ * This class represents the current state of a segment in the cluster and encapsulates the following:
+ *
+ *
the {@code DataSegment} object
+ *
overshadowed status of the segment
+ *
replication factor of the segment
+ *
+ *
+ * Objects of this class are used to sync the state of segments from the Coordinator to different services, typically the Broker.
+ * The {@link #compareTo} method considers only the {@link SegmentId}.
*/
-public class SegmentWithOvershadowedStatus implements Comparable
+public class SegmentStatusInCluster implements Comparable
{
private final boolean overshadowed;
+ /**
+ * The replication factor for the segment added across all tiers. This value is null if the load rules for
+ * the segment have not been evaluated yet.
+ */
+ private final Integer replicationFactor;
/**
* dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of
- * enclosing class. If in future, if {@code SegmentWithOvershadowedStatus} were to extend {@link DataSegment},
+ * enclosing class. If in the future, if {@code SegmentStatusInCluster} were to extend {@link DataSegment},
* there will be no change in the serialized format.
*/
@JsonUnwrapped
private final DataSegment dataSegment;
@JsonCreator
- public SegmentWithOvershadowedStatus(
- @JsonProperty("overshadowed") boolean overshadowed
+ public SegmentStatusInCluster(
+ @JsonProperty("overshadowed") boolean overshadowed,
+ @JsonProperty("replicationFactor") @Nullable Integer replicationFactor
)
{
// Jackson will overwrite dataSegment if needed (even though the field is final)
- this(null, overshadowed);
+ this(null, overshadowed, replicationFactor);
}
- public SegmentWithOvershadowedStatus(
+ public SegmentStatusInCluster(
DataSegment dataSegment,
- boolean overshadowed
+ boolean overshadowed,
+ Integer replicationFactor
)
{
this.dataSegment = dataSegment;
this.overshadowed = overshadowed;
+ this.replicationFactor = replicationFactor;
}
@JsonProperty
@@ -70,35 +86,36 @@ public DataSegment getDataSegment()
return dataSegment;
}
+ @Nullable
+ @JsonProperty
+ public Integer getReplicationFactor()
+ {
+ return replicationFactor;
+ }
+
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
- if (!(o instanceof SegmentWithOvershadowedStatus)) {
- return false;
- }
- final SegmentWithOvershadowedStatus that = (SegmentWithOvershadowedStatus) o;
- if (!dataSegment.equals(that.dataSegment)) {
- return false;
- }
- if (overshadowed != (that.overshadowed)) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
- return true;
+ SegmentStatusInCluster that = (SegmentStatusInCluster) o;
+ return overshadowed == that.overshadowed
+ && Objects.equals(replicationFactor, that.replicationFactor)
+ && Objects.equals(dataSegment, that.dataSegment);
}
@Override
public int hashCode()
{
- int result = dataSegment.hashCode();
- result = 31 * result + Boolean.hashCode(overshadowed);
- return result;
+ return Objects.hash(overshadowed, replicationFactor, dataSegment);
}
@Override
- public int compareTo(SegmentWithOvershadowedStatus o)
+ public int compareTo(SegmentStatusInCluster o)
{
return dataSegment.getId().compareTo(o.dataSegment.getId());
}
@@ -106,8 +123,9 @@ public int compareTo(SegmentWithOvershadowedStatus o)
@Override
public String toString()
{
- return "SegmentWithOvershadowedStatus{" +
+ return "SegmentStatusInCluster{" +
"overshadowed=" + overshadowed +
+ ", replicationFactor=" + replicationFactor +
", dataSegment=" + dataSegment +
'}';
}
diff --git a/processing/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java
similarity index 79%
rename from processing/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java
rename to processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java
index fd239c4d2cce..a7690faa90c4 100644
--- a/processing/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java
+++ b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java
@@ -40,15 +40,17 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
-public class SegmentWithOvershadowedStatusTest
+public class SegmentStatusInClusterTest
{
private static final ObjectMapper MAPPER = createObjectMapper();
private static final Interval INTERVAL = Intervals.of("2011-10-01/2011-10-02");
private static final ImmutableMap LOAD_SPEC = ImmutableMap.of("something", "or_other");
private static final boolean OVERSHADOWED = true;
+ private static final Integer REPLICATION_FACTOR = 2;
private static final int TEST_VERSION = 0x9;
- private static final SegmentWithOvershadowedStatus SEGMENT = createSegmentWithOvershadowedStatus();
+ private static final SegmentStatusInCluster SEGMENT = createSegmentForTest();
private static ObjectMapper createObjectMapper()
{
@@ -59,7 +61,7 @@ private static ObjectMapper createObjectMapper()
return objectMapper;
}
- private static SegmentWithOvershadowedStatus createSegmentWithOvershadowedStatus()
+ private static SegmentStatusInCluster createSegmentForTest()
{
DataSegment dataSegment = new DataSegment(
"something",
@@ -74,7 +76,7 @@ private static SegmentWithOvershadowedStatus createSegmentWithOvershadowedStatus
1
);
- return new SegmentWithOvershadowedStatus(dataSegment, OVERSHADOWED);
+ return new SegmentStatusInCluster(dataSegment, OVERSHADOWED, REPLICATION_FACTOR);
}
@Test
@@ -85,7 +87,7 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
- Assert.assertEquals(11, objectMap.size());
+ Assert.assertEquals(12, objectMap.size());
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(INTERVAL.toString(), objectMap.get("interval"));
Assert.assertEquals("1", objectMap.get("version"));
@@ -96,12 +98,13 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
Assert.assertEquals(OVERSHADOWED, objectMap.get("overshadowed"));
+ Assert.assertEquals(REPLICATION_FACTOR, objectMap.get("replicationFactor"));
final String json = MAPPER.writeValueAsString(SEGMENT);
- final TestSegmentWithOvershadowedStatus deserializedSegment = MAPPER.readValue(
+ final TestSegment deserializedSegment = MAPPER.readValue(
json,
- TestSegmentWithOvershadowedStatus.class
+ TestSegment.class
);
DataSegment dataSegment = SEGMENT.getDataSegment();
@@ -114,30 +117,33 @@ public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws E
Assert.assertEquals(dataSegment.getShardSpec(), deserializedSegment.getShardSpec());
Assert.assertEquals(dataSegment.getSize(), deserializedSegment.getSize());
Assert.assertEquals(dataSegment.getId(), deserializedSegment.getId());
+ Assert.assertEquals(OVERSHADOWED, deserializedSegment.isOvershadowed());
+ Assert.assertEquals(REPLICATION_FACTOR, deserializedSegment.getReplicationFactor());
}
- // Previously, the implementation of SegmentWithOvershadowedStatus had @JsonCreator/@JsonProperty and @JsonUnwrapped
+ // Previously, the implementation of SegmentStatusInCluster had @JsonCreator/@JsonProperty and @JsonUnwrapped
// on the same field (dataSegment), which used to work in Jackson 2.6, but does not work with Jackson 2.9:
// https://github.com/FasterXML/jackson-databind/issues/265#issuecomment-264344051
@Test
public void testJsonCreatorAndJsonUnwrappedAnnotationsAreCompatible() throws Exception
{
String json = MAPPER.writeValueAsString(SEGMENT);
- SegmentWithOvershadowedStatus segment = MAPPER.readValue(json, SegmentWithOvershadowedStatus.class);
+ SegmentStatusInCluster segment = MAPPER.readValue(json, SegmentStatusInCluster.class);
Assert.assertEquals(SEGMENT, segment);
Assert.assertEquals(json, MAPPER.writeValueAsString(segment));
}
}
/**
- * Subclass of DataSegment with overshadowed status
+ * Flat subclass of DataSegment for testing
*/
-class TestSegmentWithOvershadowedStatus extends DataSegment
+class TestSegment extends DataSegment
{
private final boolean overshadowed;
+ private final Integer replicationFactor;
@JsonCreator
- public TestSegmentWithOvershadowedStatus(
+ public TestSegment(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
@@ -154,7 +160,8 @@ public TestSegmentWithOvershadowedStatus(
@JsonProperty("lasCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
- @JsonProperty("overshadowed") boolean overshadowed
+ @JsonProperty("overshadowed") boolean overshadowed,
+ @JsonProperty("replicationFactor") Integer replicationFactor
)
{
super(
@@ -170,6 +177,7 @@ public TestSegmentWithOvershadowedStatus(
size
);
this.overshadowed = overshadowed;
+ this.replicationFactor = replicationFactor;
}
@JsonProperty
@@ -178,23 +186,31 @@ public boolean isOvershadowed()
return overshadowed;
}
+ @JsonProperty
+ public Integer getReplicationFactor()
+ {
+ return replicationFactor;
+ }
+
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
- if (!(o instanceof TestSegmentWithOvershadowedStatus)) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
- final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o;
- if (overshadowed != (that.overshadowed)) {
- return false;
- }
- return true;
+ TestSegment that = (TestSegment) o;
+ return overshadowed == that.overshadowed && Objects.equals(replicationFactor, that.replicationFactor);
}
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), overshadowed, replicationFactor);
+ }
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 8fce69e340b2..e37ab05d6dbc 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -154,6 +154,13 @@ public class DruidCoordinator
private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;
+
+ /**
+ * Contains a map of segmentId to total replication factor across all tiers. This map is refreshed when load rules are
+ * evaluated. It is used by {@link DruidCoordinator} to supply this value to
+ * {@link org.apache.druid.server.http.MetadataResource}.
+ */
+ private volatile Object2IntMap segmentIdToReplicationFactor = null;
private volatile DruidCluster cluster = null;
private int cachedBalancerThreadNumber;
@@ -817,6 +824,12 @@ private List makeCompactSegmentsDuty()
return ImmutableList.of(compactSegments);
}
+ @Nullable
+ public Integer getReplicationFactorForSegment(SegmentId segmentId)
+ {
+ return segmentIdToReplicationFactor == null ? null : segmentIdToReplicationFactor.getInt(segmentId);
+ }
+
@VisibleForTesting
protected class DutiesRunnable implements Runnable
{
@@ -943,6 +956,13 @@ public void run()
}
}
}
+
+ // Update the immutable replication factor map with latest values.
+ // This value is set here as it is recalculated during load rule evaluation.
+ if (params.getSegmentReplicantLookup() != null) {
+ segmentIdToReplicationFactor = params.getSegmentReplicantLookup().getSegmentIdToReplicationFactor();
+ }
+
// Emit the runtime of the full DutiesRunnable
params.getEmitter().emit(
new ServiceMetricEvent.Builder()
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
index 3a4bdb9f6274..96e4fe812760 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
@@ -21,6 +21,8 @@
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.client.ImmutableDruidServer;
@@ -79,6 +81,7 @@ public static SegmentReplicantLookup make(DruidCluster cluster, boolean replicat
private final Table segmentsInCluster;
private final Table loadingSegments;
+ private final Map segmentIdToReplicationFactor = new HashMap<>();
private final DruidCluster cluster;
private SegmentReplicantLookup(
@@ -114,6 +117,18 @@ public int getLoadedReplicants(SegmentId segmentId, String tier)
return (retVal == null) ? 0 : retVal;
}
+ // TODO: Refactor this setter, as this class is following a singleton pattern with only getters, and this breaks convention.
+ // This would be revamped in https://github.com/apache/druid/pull/13197
+ public void setReplicationFactor(SegmentId segmentId, Integer requiredReplicas)
+ {
+ segmentIdToReplicationFactor.put(segmentId, requiredReplicas);
+ }
+
+ public Object2IntMap getSegmentIdToReplicationFactor()
+ {
+ return new Object2IntOpenHashMap<>(segmentIdToReplicationFactor);
+ }
+
private int getLoadingReplicants(SegmentId segmentId, String tier)
{
Integer retVal = loadingSegments.get(segmentId, tier);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
index 933b63e47263..c89a74edbff8 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
@@ -77,6 +77,8 @@ public CoordinatorStats run(
targetReplicants.putAll(getTieredReplicants());
currentReplicants.putAll(params.getSegmentReplicantLookup().getClusterTiers(segment.getId()));
+ params.getSegmentReplicantLookup().setReplicationFactor(segment.getId(), getReplicationFactor());
+
final CoordinatorStats stats = new CoordinatorStats();
assign(params, segment, stats);
@@ -93,6 +95,11 @@ public CoordinatorStats run(
}
}
+ private int getReplicationFactor()
+ {
+ return getTieredReplicants().values().stream().reduce(0, Integer::sum);
+ }
+
@Override
public boolean canLoadSegments()
{
diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index ade8ab992e47..0583cc87a162 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -32,13 +32,14 @@
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.JettyUtils;
+import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.http.security.DatasourceResourceFilter;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
+import org.apache.druid.timeline.SegmentStatusInCluster;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -69,18 +70,21 @@ public class MetadataResource
private final SegmentsMetadataManager segmentsMetadataManager;
private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private final AuthorizerMapper authorizerMapper;
+ private final DruidCoordinator coordinator;
@Inject
public MetadataResource(
SegmentsMetadataManager segmentsMetadataManager,
IndexerMetadataStorageCoordinator metadataStorageCoordinator,
AuthorizerMapper authorizerMapper,
+ DruidCoordinator coordinator,
@Json ObjectMapper jsonMapper
)
{
this.segmentsMetadataManager = segmentsMetadataManager;
this.metadataStorageCoordinator = metadataStorageCoordinator;
this.authorizerMapper = authorizerMapper;
+ this.coordinator = coordinator;
}
@GET
@@ -140,7 +144,7 @@ public Response getAllUsedSegments(
)
{
if (includeOvershadowedStatus != null) {
- return getAllUsedSegmentsWithOvershadowedStatus(req, dataSources);
+ return getAllUsedSegmentsWithAdditionalDetails(req, dataSources);
}
Collection dataSourcesWithUsedSegments =
@@ -165,7 +169,7 @@ public Response getAllUsedSegments(
return builder.entity(authorizedSegments).build();
}
- private Response getAllUsedSegmentsWithOvershadowedStatus(
+ private Response getAllUsedSegmentsWithAdditionalDetails(
HttpServletRequest req,
@Nullable Set dataSources
)
@@ -184,15 +188,30 @@ private Response getAllUsedSegmentsWithOvershadowedStatus(
.flatMap(t -> t.getSegments().stream());
final Set overshadowedSegments = dataSourcesSnapshot.getOvershadowedSegments();
- final Stream usedSegmentsWithOvershadowedStatus = usedSegments
- .map(segment -> new SegmentWithOvershadowedStatus(segment, overshadowedSegments.contains(segment)));
+ final Stream segmentStatus = usedSegments
+ .map(segment -> {
+ boolean isOvershadowed = overshadowedSegments.contains(segment);
+ Integer replicationFactor;
+ if (isOvershadowed) {
+ // If the segment is overshadowed, the replication factor won't be present in the coordinator, but we know
+ // that it should be 0 as we will unload it soon.
+ replicationFactor = 0;
+ } else {
+ replicationFactor = coordinator.getReplicationFactorForSegment(segment.getId());
+ }
+ return new SegmentStatusInCluster(
+ segment,
+ isOvershadowed,
+ replicationFactor
+ );
+ });
- final Function> raGenerator = segment -> Collections
+ final Function> raGenerator = segment -> Collections
.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSegment().getDataSource()));
- final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
+ final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
req,
- usedSegmentsWithOvershadowedStatus::iterator,
+ segmentStatus::iterator,
raGenerator,
authorizerMapper
);
diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
new file mode 100644
index 000000000000..f87b65d606d6
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.DataSourcesSnapshot;
+import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentStatusInCluster;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class MetadataResourceTest
+{
+ private static final String DATASOURCE1 = "datasource1";
+ private static final String DATASOURCE2 = "datasource2";
+
+ private MetadataResource metadataResource;
+
+ private SegmentsMetadataManager segmentsMetadataManager;
+ private DruidCoordinator coordinator;
+ private HttpServletRequest request;
+
+ private final DataSegment dataSegment1 = new DataSegment(
+ DATASOURCE1,
+ Intervals.of("2010-01-01/P1D"),
+ "v0",
+ null,
+ null,
+ null,
+ null,
+ 0x9,
+ 10
+ );
+
+ private final DataSegment dataSegment2 = new DataSegment(
+ DATASOURCE1,
+ Intervals.of("2010-01-22/P1D"),
+ "v0",
+ null,
+ null,
+ null,
+ null,
+ 0x9,
+ 20
+ );
+
+ private final DataSegment dataSegment3 = new DataSegment(
+ DATASOURCE2,
+ Intervals.of("2010-01-01/P1M"),
+ "v0",
+ null,
+ null,
+ null,
+ null,
+ 0x9,
+ 30
+ );
+
+ private final DataSegment dataSegment4 = new DataSegment(
+ DATASOURCE2,
+ Intervals.of("2010-01-02/P1D"),
+ "v0",
+ null,
+ null,
+ null,
+ null,
+ 0x9,
+ 35
+ );
+
+ @Before
+ public void setUp()
+ {
+ request = mock(HttpServletRequest.class);
+ doReturn(mock(AuthenticationResult.class)).when(request).getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT);
+
+ segmentsMetadataManager = mock(SegmentsMetadataManager.class);
+ ImmutableDruidDataSource druidDataSource1 = new ImmutableDruidDataSource(
+ DATASOURCE1,
+ ImmutableMap.of(),
+ ImmutableList.of(
+ dataSegment1,
+ dataSegment2
+ )
+ );
+
+ ImmutableDruidDataSource druidDataSource2 = new ImmutableDruidDataSource(
+ DATASOURCE1,
+ ImmutableMap.of(),
+ ImmutableList.of(
+ dataSegment3,
+ dataSegment4
+ )
+ );
+
+ DataSourcesSnapshot dataSourcesSnapshot = mock(DataSourcesSnapshot.class);
+ doReturn(dataSourcesSnapshot).when(segmentsMetadataManager).getSnapshotOfDataSourcesWithAllUsedSegments();
+ doReturn(ImmutableList.of(druidDataSource1, druidDataSource2)).when(dataSourcesSnapshot).getDataSourcesWithAllUsedSegments();
+
+ coordinator = mock(DruidCoordinator.class);
+ doReturn(2).when(coordinator).getReplicationFactorForSegment(dataSegment1.getId());
+ doReturn(null).when(coordinator).getReplicationFactorForSegment(dataSegment2.getId());
+ doReturn(1).when(coordinator).getReplicationFactorForSegment(dataSegment3.getId());
+ doReturn(1).when(coordinator).getReplicationFactorForSegment(dataSegment4.getId());
+ doReturn(ImmutableSet.of(dataSegment4)).when(dataSourcesSnapshot).getOvershadowedSegments();
+
+ metadataResource = new MetadataResource(segmentsMetadataManager, mock(IndexerMetadataStorageCoordinator.class), AuthTestUtils.TEST_AUTHORIZER_MAPPER, coordinator, new ObjectMapper());
+ }
+
+ @Test
+ public void testGetAllSegmentsWithOvershadowedStatus()
+ {
+ Response response = metadataResource.getAllUsedSegments(
+ request,
+ null,
+ "includeOvershadowedStatus"
+ );
+
+ List resultList = materializeResponse(response);
+ Assert.assertEquals(resultList.size(), 4);
+ Assert.assertEquals(new SegmentStatusInCluster(dataSegment1, false, 2), resultList.get(0));
+ Assert.assertEquals(new SegmentStatusInCluster(dataSegment2, false, null), resultList.get(1));
+ Assert.assertEquals(new SegmentStatusInCluster(dataSegment3, false, 1), resultList.get(2));
+ // Replication factor should be 0 as the segment is overshadowed
+ Assert.assertEquals(new SegmentStatusInCluster(dataSegment4, true, 0), resultList.get(3));
+ }
+
+ private List materializeResponse(Response response)
+ {
+ Iterable resultIterator = (Iterable) response.getEntity();
+ List segmentStatusInClusters = new ArrayList<>();
+ resultIterator.forEach(segmentStatusInClusters::add);
+ return segmentStatusInClusters;
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
index 5d97ea394d25..c3a1ab48f5ab 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java
@@ -22,6 +22,8 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.Inject;
@@ -40,7 +42,8 @@
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.sql.calcite.planner.SegmentMetadataCacheConfig;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.SegmentStatusInCluster;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import java.util.Iterator;
@@ -76,7 +79,12 @@ public class MetadataSegmentView
* from other threads.
*/
@MonotonicNonNull
- private volatile ImmutableSortedSet publishedSegments = null;
+ private volatile ImmutableSortedSet publishedSegments = null;
+ /**
+ * Caches the replication factor for segment IDs. In case of coordinator restarts or leadership re-elections, the coordinator API returns `null` replication factor until load rules are evaluated.
+ * The cache can be used during these periods to continue serving the previously fetched values.
+ */
+ private final Cache segmentIdToReplicationFactor;
private final ScheduledExecutorService scheduledExec;
private final long pollPeriodInMS;
private final LifecycleLock lifecycleLock = new LifecycleLock();
@@ -97,6 +105,9 @@ public MetadataSegmentView(
this.isCacheEnabled = config.isMetadataSegmentCacheEnable();
this.pollPeriodInMS = config.getMetadataSegmentPollPeriod();
this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d");
+ this.segmentIdToReplicationFactor = CacheBuilder.newBuilder()
+ .expireAfterAccess(10, TimeUnit.MINUTES)
+ .build();
}
@LifecycleStart
@@ -133,27 +144,34 @@ public void stop()
private void poll()
{
log.info("polling published segments from coordinator");
- final JsonParserIterator metadataSegments = getMetadataSegments(
+ final JsonParserIterator metadataSegments = getMetadataSegments(
coordinatorDruidLeaderClient,
jsonMapper,
segmentWatcherConfig.getWatchedDataSources()
);
- final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder();
+ final ImmutableSortedSet.Builder builder = ImmutableSortedSet.naturalOrder();
while (metadataSegments.hasNext()) {
- final SegmentWithOvershadowedStatus segment = metadataSegments.next();
+ final SegmentStatusInCluster segment = metadataSegments.next();
final DataSegment interned = DataSegmentInterner.intern(segment.getDataSegment());
- final SegmentWithOvershadowedStatus segmentWithOvershadowedStatus = new SegmentWithOvershadowedStatus(
+ Integer replicationFactor = segment.getReplicationFactor();
+ if (replicationFactor == null) {
+ replicationFactor = segmentIdToReplicationFactor.getIfPresent(segment.getDataSegment().getId());
+ } else {
+ segmentIdToReplicationFactor.put(segment.getDataSegment().getId(), segment.getReplicationFactor());
+ }
+ final SegmentStatusInCluster segmentStatusInCluster = new SegmentStatusInCluster(
interned,
- segment.isOvershadowed()
+ segment.isOvershadowed(),
+ replicationFactor
);
- builder.add(segmentWithOvershadowedStatus);
+ builder.add(segmentStatusInCluster);
}
publishedSegments = builder.build();
cachePopulated.countDown();
}
- Iterator getPublishedSegments()
+ Iterator getPublishedSegments()
{
if (isCacheEnabled) {
Uninterruptibles.awaitUninterruptibly(cachePopulated);
@@ -168,7 +186,7 @@ Iterator getPublishedSegments()
}
// Note that coordinator must be up to get segments
- private JsonParserIterator getMetadataSegments(
+ private JsonParserIterator getMetadataSegments(
DruidLeaderClient coordinatorClient,
ObjectMapper jsonMapper,
Set watchedDataSources
@@ -188,7 +206,7 @@ private JsonParserIterator getMetadataSegments(
return SystemSchema.getThingsFromLeaderNode(
query,
- new TypeReference()
+ new TypeReference()
{
},
coordinatorClient,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 3e6cb3e58f20..2c60e9eda1ce 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -81,7 +81,7 @@
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
+import org.apache.druid.timeline.SegmentStatusInCluster;
import org.jboss.netty.handler.codec.http.HttpMethod;
import javax.annotation.Nullable;
@@ -106,8 +106,8 @@ public class SystemSchema extends AbstractSchema
private static final String TASKS_TABLE = "tasks";
private static final String SUPERVISOR_TABLE = "supervisors";
- private static final Function>
- SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR = segment ->
+ private static final Function>
+ SEGMENT_STATUS_IN_CLUSTER_RA_GENERATOR = segment ->
Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(
segment.getDataSegment().getDataSource())
);
@@ -117,6 +117,8 @@ public class SystemSchema extends AbstractSchema
segment.getDataSource())
);
+ private static final long REPLICATION_FACTOR_UNKNOWN = -1L;
+
/**
* Booleans constants represented as long type,
* where 1 = true and 0 = false to make it easy to count number of segments
@@ -150,6 +152,7 @@ public class SystemSchema extends AbstractSchema
.add("dimensions", ColumnType.STRING)
.add("metrics", ColumnType.STRING)
.add("last_compaction_state", ColumnType.STRING)
+ .add("replication_factor", ColumnType.LONG)
.build();
static final RowSignature SERVERS_SIGNATURE = RowSignature
@@ -288,7 +291,7 @@ public Enumerable