diff --git a/docs/api-reference/legacy-metadata-api.md b/docs/api-reference/legacy-metadata-api.md
index 453159c1a582..d22be18a7ec9 100644
--- a/docs/api-reference/legacy-metadata-api.md
+++ b/docs/api-reference/legacy-metadata-api.md
@@ -176,7 +176,11 @@ Returns a list of all segments, overlapping with any of given intervals, for a d
`POST /druid/coordinator/v1/metadata/dataSourceInformation`
-Returns information about the specified datasources, including the datasource schema.
+Returns information about the specified datasources, including the datasource schema.
+
+`POST /druid/coordinator/v1/metadata/bootstrapSegments`
+
+Returns information about bootstrap segments for all datasources. The returned set includes all broadcast segments if broadcast rules are configured.
diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java
index 555e7c67bb1f..a04f3f6512cf 100644
--- a/processing/src/main/java/org/apache/druid/error/DruidException.java
+++ b/processing/src/main/java/org/apache/druid/error/DruidException.java
@@ -331,19 +331,19 @@ public enum Persona
}
/**
- * Category of error. The simplest way to describe this is that it exists as a classification of errors that
+ * Category of error. The simplest way to describe this is that it exists as a classification of errors that
* enables us to identify the expected response code (e.g. HTTP status code) of a specific DruidException
*/
public enum Category
{
/**
* Means that the exception is being created defensively, because we want to validate something but expect that
- * it should never actually be hit. Using this category is good to provide an indication to future reviewers and
+ * it should never actually be hit. Using this category is good to provide an indication to future reviewers and
* developers that the case being checked is not intended to actually be able to occur in the wild.
*/
DEFENSIVE(500),
/**
- * Means that the input provided was malformed in some way. Generally speaking, it is hoped that errors of this
+ * Means that the input provided was malformed in some way. Generally speaking, it is hoped that errors of this
* category have messages written either targeting the USER or ADMIN personas as those are the general users
* of the APIs who could generate invalid inputs.
*/
@@ -356,9 +356,8 @@ public enum Category
* Means that an action that was attempted is forbidden
*/
FORBIDDEN(403),
-
/**
- * Means that the requsted requested resource cannot be found.
+ * Means that the requested resource cannot be found.
*/
NOT_FOUND(404),
/**
diff --git a/server/src/main/java/org/apache/druid/client/BootstrapSegmentsResponse.java b/server/src/main/java/org/apache/druid/client/BootstrapSegmentsResponse.java
new file mode 100644
index 000000000000..7d1569262427
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/BootstrapSegmentsResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.timeline.DataSegment;
+
+public class BootstrapSegmentsResponse
+{
+ private final CloseableIterator iterator;
+
+ public BootstrapSegmentsResponse(final CloseableIterator iterator)
+ {
+ this.iterator = iterator;
+ }
+
+ public CloseableIterator getIterator()
+ {
+ return iterator;
+ }
+
+}
diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index 00dea3dff0e0..7aa887743979 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -86,6 +86,16 @@ public JsonParserIterator(
this.hasTimeout = timeoutAt > -1;
}
+ /**
+ * Bypasses Jackson serialization to prevent materialization of results from the {@code future} in memory at once.
+ * A shortened version of {@link #JsonParserIterator(JavaType, Future, String, Query, String, ObjectMapper)}
+ * where the URL and host parameters, used solely for logging/errors, are not known.
+ */
+ public JsonParserIterator(JavaType typeRef, Future future, ObjectMapper objectMapper)
+ {
+ this(typeRef, future, "", null, "", objectMapper);
+ }
+
@Override
public boolean hasNext()
{
diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index aeccee8043ba..fdf16b2ac505 100644
--- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -20,6 +20,7 @@
package org.apache.druid.client.coordinator;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.rpc.ServiceRetryPolicy;
@@ -58,6 +59,12 @@ public interface CoordinatorClient
*/
ListenableFuture> fetchDataSourceInformation(Set datasources);
+ /**
+ * Fetch bootstrap segments from the coordinator. The results must be streamed back to the caller as the
+ * result set can be large.
+ */
+ ListenableFuture fetchBootstrapSegments();
+
/**
* Returns a new instance backed by a ServiceClient which follows the provided retryPolicy
*/
diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
index 93c22bbdbff8..4c795c9dbd47 100644
--- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
+++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java
@@ -21,17 +21,22 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.JsonParserIterator;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
+import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.segment.metadata.DataSourceInformation;
+import org.apache.druid.server.coordination.LoadableDataSegment;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Interval;
@@ -156,6 +161,28 @@ public ListenableFuture> fetchDataSourceInformation(
);
}
+ @Override
+ public ListenableFuture fetchBootstrapSegments()
+ {
+ final String path = "/druid/coordinator/v1/metadata/bootstrapSegments";
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.POST, path),
+ new InputStreamResponseHandler()
+ ),
+ in -> new BootstrapSegmentsResponse(
+ new JsonParserIterator<>(
+ // Some servers, like the Broker, may have PruneLoadSpec set to true for optimization reasons.
+ // We specifically use LoadableDataSegment here instead of DataSegment so the callers can still correctly
+ // load the bootstrap segments, as the load specs are guaranteed not to be pruned.
+ jsonMapper.getTypeFactory().constructType(LoadableDataSegment.class),
+ Futures.immediateFuture(in),
+ jsonMapper
+ )
+ )
+ );
+ }
+
@Override
public CoordinatorClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
index 35276aa723dc..3e3d86ca5f25 100644
--- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
+++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java
@@ -323,9 +323,6 @@ private JsonParserIterator asJsonParserIterator(final InputStream in, fin
return new JsonParserIterator<>(
jsonMapper.getTypeFactory().constructType(clazz),
Futures.immediateFuture(in),
- "", // We don't know URL at this point, but it's OK to use empty; it's used for logs/errors
- null,
- "", // We don't know host at this point, but it's OK to use empty; it's used for logs/errors
jsonMapper
);
}
diff --git a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java
index 0f27dac9e1bb..27807a5c3c87 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPuller.java
@@ -19,7 +19,6 @@
package org.apache.druid.segment.loading;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.io.Files;
import org.apache.druid.java.util.common.FileUtils;
@@ -117,12 +116,6 @@ public boolean delete()
private static final Logger log = new Logger(LocalDataSegmentPuller.class);
- @VisibleForTesting
- public void getSegmentFiles(DataSegment segment, File dir) throws SegmentLoadingException
- {
- getSegmentFiles(getFile(segment), dir);
- }
-
public FileUtils.FileCopyResult getSegmentFiles(final File sourceFile, final File dir) throws SegmentLoadingException
{
if (sourceFile.isDirectory()) {
diff --git a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
index 4f4f7a5b1d19..2a633b6ad275 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
@@ -19,7 +19,6 @@
package org.apache.druid.server.coordination;
-import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
@@ -59,8 +58,7 @@ public LoadableDataSegment(
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
- @JsonProperty("size") long size,
- @JacksonInject PruneSpecsHolder pruneSpecsHolder
+ @JsonProperty("size") long size
)
{
super(
diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index bcd88ee7ee93..4a9086ab5727 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -29,6 +29,9 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
+import org.apache.druid.client.BootstrapSegmentsResponse;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.ISE;
@@ -37,6 +40,8 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
@@ -83,6 +88,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
private final SegmentManager segmentManager;
private final ScheduledExecutorService exec;
private final ServerTypeConfig serverTypeConfig;
+ private final CoordinatorClient coordinatorClient;
+ private final ServiceEmitter emitter;
private final ConcurrentSkipListSet segmentsToDelete;
private volatile boolean started = false;
@@ -103,7 +110,9 @@ public SegmentLoadDropHandler(
DataSegmentAnnouncer announcer,
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
- ServerTypeConfig serverTypeConfig
+ ServerTypeConfig serverTypeConfig,
+ CoordinatorClient coordinatorClient,
+ ServiceEmitter emitter
)
{
this(
@@ -115,7 +124,9 @@ public SegmentLoadDropHandler(
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
),
- serverTypeConfig
+ serverTypeConfig,
+ coordinatorClient,
+ emitter
);
}
@@ -126,7 +137,9 @@ public SegmentLoadDropHandler(
DataSegmentServerAnnouncer serverAnnouncer,
SegmentManager segmentManager,
ScheduledExecutorService exec,
- ServerTypeConfig serverTypeConfig
+ ServerTypeConfig serverTypeConfig,
+ CoordinatorClient coordinatorClient,
+ ServiceEmitter emitter
)
{
this.config = config;
@@ -135,6 +148,8 @@ public SegmentLoadDropHandler(
this.segmentManager = segmentManager;
this.exec = exec;
this.serverTypeConfig = serverTypeConfig;
+ this.coordinatorClient = coordinatorClient;
+ this.emitter = emitter;
this.segmentsToDelete = new ConcurrentSkipListSet<>();
requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
@@ -151,7 +166,7 @@ public void start() throws IOException
log.info("Starting...");
try {
if (segmentManager.canHandleSegments()) {
- bootstrapCachedSegments();
+ loadSegmentsOnStartup();
}
if (shouldAnnounce()) {
@@ -207,12 +222,17 @@ public Map getRowCountDistributionPerDataso
}
/**
- * Bulk loading of cached segments into page cache during bootstrap.
+ * Bulk loading of the following segments into the page cache at startup:
+ * Previously cached segments
+ * Bootstrap segments from the coordinator
*/
- private void bootstrapCachedSegments() throws IOException
+ private void loadSegmentsOnStartup() throws IOException
{
+ final List segmentsOnStartup = new ArrayList<>();
+ segmentsOnStartup.addAll(segmentManager.getCachedSegments());
+ segmentsOnStartup.addAll(getBootstrapSegments());
+
final Stopwatch stopwatch = Stopwatch.createStarted();
- final List segments = segmentManager.getCachedSegments();
// Start a temporary thread pool to load segments into page cache during bootstrap
final ExecutorService loadingExecutor = Execs.multiThreaded(
@@ -224,11 +244,11 @@ private void bootstrapCachedSegments() throws IOException
backgroundSegmentAnnouncer.startAnnouncing();
- final int numSegments = segments.size();
+ final int numSegments = segmentsOnStartup.size();
final CountDownLatch latch = new CountDownLatch(numSegments);
final AtomicInteger counter = new AtomicInteger(0);
final CopyOnWriteArrayList failedSegments = new CopyOnWriteArrayList<>();
- for (final DataSegment segment : segments) {
+ for (final DataSegment segment : segmentsOnStartup) {
loadingExecutor.submit(
() -> {
try {
@@ -269,7 +289,7 @@ private void bootstrapCachedSegments() throws IOException
latch.await();
if (failedSegments.size() > 0) {
- log.makeAlert("%,d errors seen while loading segments", failedSegments.size())
+ log.makeAlert("[%,d] errors seen while loading segments on startup", failedSegments.size())
.addData("failedSegments", failedSegments)
.emit();
}
@@ -282,8 +302,8 @@ private void bootstrapCachedSegments() throws IOException
backgroundSegmentAnnouncer.finishAnnouncing();
}
catch (SegmentLoadingException e) {
- log.makeAlert(e, "Failed to load segments -- likely problem with announcing.")
- .addData("numSegments", segments.size())
+ log.makeAlert(e, "Failed to load segments on startup -- likely problem with announcing.")
+ .addData("numSegments", segmentsOnStartup.size())
.emit();
}
finally {
@@ -292,10 +312,41 @@ private void bootstrapCachedSegments() throws IOException
// At this stage, all tasks have been submitted, send a shutdown command to cleanup any resources alloted
// for the bootstrapping function.
segmentManager.shutdownBootstrap();
- log.info("Cache load of [%d] bootstrap segments took [%,d]ms.", segments.size(), stopwatch.millisElapsed());
+ log.info("Loaded [%d] segments on startup in [%,d]ms.", segmentsOnStartup.size(), stopwatch.millisElapsed());
}
}
+ /**
+ * @return a list of bootstrap segments. When bootstrap segments cannot be found, an empty list is returned.
+ */
+ private List getBootstrapSegments()
+ {
+ log.info("Fetching bootstrap segments from the coordinator.");
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+
+ List bootstrapSegments = new ArrayList<>();
+
+ try {
+ final BootstrapSegmentsResponse response =
+ FutureUtils.getUnchecked(coordinatorClient.fetchBootstrapSegments(), true);
+ bootstrapSegments = ImmutableList.copyOf(response.getIterator());
+ }
+ catch (Exception e) {
+ // By default, we "fail open" when there is any error -- finding the coordinator, or if the API endpoint cannot
+ // be found during rolling upgrades, or even if it's irrecoverable.
+ log.warn("Error fetching bootstrap segments from the coordinator: [%s]. ", e.getMessage());
+ }
+ finally {
+ stopwatch.stop();
+ final long fetchRunMillis = stopwatch.millisElapsed();
+ emitter.emit(new ServiceMetricEvent.Builder().setMetric("segment/bootstrap/time", fetchRunMillis));
+ emitter.emit(new ServiceMetricEvent.Builder().setMetric("segment/bootstrap/count", bootstrapSegments.size()));
+ log.info("Fetched [%d] bootstrap segments in [%d]ms.", bootstrapSegments.size(), fetchRunMillis);
+ }
+
+ return bootstrapSegments;
+ }
+
@Override
public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
{
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 0787bc8f7d4f..36cfac8089c4 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -174,6 +174,12 @@ public class DruidCoordinator
*/
private volatile SegmentReplicationStatus segmentReplicationStatus = null;
+ /**
+ * Set of broadcast segments determined in the latest coordinator run of the {@link RunRules} duty.
+ * This might contain stale information if the Coordinator duties haven't run or are delayed.
+ */
+ private volatile Set broadcastSegments = null;
+
public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
@@ -315,6 +321,16 @@ public Map getDatasourceToLoadStatus()
return loadStatus;
}
+ /**
+ * @return Set of broadcast segments determined by the latest run of the {@link RunRules} duty.
+ * If the coordinator runs haven't triggered or are delayed, this information may be stale.
+ */
+ @Nullable
+ public Set getBroadcastSegments()
+ {
+ return broadcastSegments;
+ }
+
@Nullable
public Integer getReplicationFactor(SegmentId segmentId)
{
@@ -798,6 +814,7 @@ private class UpdateReplicationStatus implements CoordinatorDuty
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
+ broadcastSegments = params.getBroadcastSegments();
segmentReplicationStatus = params.getSegmentReplicationStatus();
// Collect stats for unavailable and under-replicated segments
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index 5548636b0000..ebdbd4f500e8 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -123,6 +123,12 @@ public SegmentReplicationStatus getSegmentReplicationStatus()
return segmentAssigner == null ? null : segmentAssigner.getReplicationStatus();
}
+ @Nullable
+ public Set getBroadcastSegments()
+ {
+ return segmentAssigner == null ? null : segmentAssigner.getBroadcastSegments();
+ }
+
public StrategicSegmentAssigner getSegmentAssigner()
{
return segmentAssigner;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
index 1c2a867c4faf..9b5d38f198eb 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
@@ -69,6 +69,7 @@ public class StrategicSegmentAssigner implements SegmentActionHandler
private final Map tierToHistoricalCount = new HashMap<>();
private final Map> segmentsToDelete = new HashMap<>();
private final Map> segmentsWithZeroRequiredReplicas = new HashMap<>();
+ private final Set broadcastSegments = new HashSet<>();
public StrategicSegmentAssigner(
SegmentLoadQueueManager loadQueueManager,
@@ -361,6 +362,8 @@ public void broadcastSegment(DataSegment segment)
entry -> replicaCountMap.computeIfAbsent(segment.getId(), entry.getKey())
.setRequired(entry.getIntValue(), entry.getIntValue())
);
+
+ broadcastSegments.add(segment);
}
@Override
@@ -398,6 +401,11 @@ private boolean loadBroadcastSegment(DataSegment segment, ServerHolder server)
return false;
}
+ public Set getBroadcastSegments()
+ {
+ return broadcastSegments;
+ }
+
/**
* Drops the broadcast segment if it is loaded on the given server.
* Returns true only if the segment was successfully queued for drop on the server.
diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index d8b00c318db6..a1cccd2b784a 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -471,4 +471,23 @@ public Response getDataSourceInformation(
);
return Response.status(Response.Status.OK).entity(authorizedDataSourceInformation).build();
}
+
+ /**
+ * @return all bootstrap segments determined by the coordinator.
+ */
+ @POST
+ @Path("/bootstrapSegments")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(DatasourceResourceFilter.class)
+ public Response getBootstrapSegments()
+ {
+ final Set broadcastSegments = coordinator.getBroadcastSegments();
+ if (broadcastSegments == null) {
+ return Response.status(Response.Status.SERVICE_UNAVAILABLE)
+ .entity("Bootstrap segments are not initialized yet."
+ + " Please ensure that the Coordinator duties are running and try again.")
+ .build();
+ }
+ return Response.status(Response.Status.OK).entity(broadcastSegments).build();
+ }
}
diff --git a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java
index 62af96d4e0eb..3dde6dda149b 100644
--- a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java
+++ b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java
@@ -24,7 +24,12 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Injector;
+import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.guice.StartupInjectorBuilder;
+import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@@ -37,6 +42,7 @@
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.PruneLoadSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@@ -58,6 +64,24 @@ public class CoordinatorClientImplTest
private MockServiceClient serviceClient;
private CoordinatorClient coordinatorClient;
+ private static final DataSegment SEGMENT1 = DataSegment.builder()
+ .dataSource("xyz")
+ .interval(Intervals.of("1000/2000"))
+ .version("1")
+ .loadSpec(ImmutableMap.of("type", "local", "loc", "foo"))
+ .shardSpec(new NumberedShardSpec(0, 1))
+ .size(1)
+ .build();
+
+ private static final DataSegment SEGMENT2 = DataSegment.builder()
+ .dataSource("xyz")
+ .interval(Intervals.of("2000/3000"))
+ .version("1")
+ .loadSpec(ImmutableMap.of("type", "local", "loc", "bar"))
+ .shardSpec(new NumberedShardSpec(0, 1))
+ .size(1)
+ .build();
+
@Before
public void setup()
{
@@ -181,6 +205,82 @@ public void test_fetchUsedSegments() throws Exception
);
}
+ @Test
+ public void test_fetchBootstrapSegments() throws Exception
+ {
+ final List expectedSegments = ImmutableList.of(SEGMENT1, SEGMENT2);
+
+ serviceClient.expectAndRespond(
+ new RequestBuilder(HttpMethod.POST, "/druid/coordinator/v1/metadata/bootstrapSegments"),
+ HttpResponseStatus.OK,
+ ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+ jsonMapper.writeValueAsBytes(expectedSegments)
+ );
+
+ final ListenableFuture response = coordinatorClient.fetchBootstrapSegments();
+ Assert.assertNotNull(response);
+
+ final ImmutableList observedDataSegments = ImmutableList.copyOf(response.get().getIterator());
+ for (int idx = 0; idx < expectedSegments.size(); idx++) {
+ Assert.assertEquals(expectedSegments.get(idx).getLoadSpec(), observedDataSegments.get(idx).getLoadSpec());
+ }
+ }
+
+ /**
+ * Set up a Guice injector with PruneLoadSpec set to true. This test verifies that the bootstrap segments API
+ * always return segments with load specs present, ensuring they can be loaded anywhere.
+ */
+ @Test
+ public void test_fetchBootstrapSegmentsAreLoadableWhenPruneLoadSpecIsEnabled() throws Exception
+ {
+ final List expectedSegments = ImmutableList.of(SEGMENT1, SEGMENT2);
+
+ // Set up a coordinator client with PruneLoadSpec set to true in the injector
+ final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
+ .addModule(binder -> binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true))
+ .build();
+
+ final ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
+ final CoordinatorClient coordinatorClient = new CoordinatorClientImpl(serviceClient, objectMapper);
+
+ serviceClient.expectAndRespond(
+ new RequestBuilder(HttpMethod.POST, "/druid/coordinator/v1/metadata/bootstrapSegments"),
+ HttpResponseStatus.OK,
+ ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+ objectMapper.writeValueAsBytes(expectedSegments)
+ );
+
+ final ListenableFuture response = coordinatorClient.fetchBootstrapSegments();
+ Assert.assertNotNull(response);
+
+ final ImmutableList observedDataSegments = ImmutableList.copyOf(response.get().getIterator());
+ Assert.assertEquals(expectedSegments, observedDataSegments);
+ for (int idx = 0; idx < expectedSegments.size(); idx++) {
+ Assert.assertEquals(expectedSegments.get(idx).getLoadSpec(), observedDataSegments.get(idx).getLoadSpec());
+ }
+ }
+
+ @Test
+ public void test_fetchEmptyBootstrapSegments() throws Exception
+ {
+ final List segments = ImmutableList.of();
+
+ serviceClient.expectAndRespond(
+ new RequestBuilder(HttpMethod.POST, "/druid/coordinator/v1/metadata/bootstrapSegments"),
+ HttpResponseStatus.OK,
+ ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+ jsonMapper.writeValueAsBytes(segments)
+ );
+
+ final ListenableFuture response = coordinatorClient.fetchBootstrapSegments();
+ Assert.assertNotNull(response);
+
+ Assert.assertEquals(
+ segments,
+ ImmutableList.copyOf(response.get().getIterator())
+ );
+ }
+
@Test
public void test_fetchDataSourceInformation() throws Exception
{
diff --git a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
index 7b7d253ef6d5..5aee343a851b 100644
--- a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
+++ b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java
@@ -20,6 +20,7 @@
package org.apache.druid.client.coordinator;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.BootstrapSegmentsResponse;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.rpc.ServiceRetryPolicy;
@@ -62,6 +63,12 @@ public ListenableFuture> fetchDataSourceInformation(
throw new UnsupportedOperationException();
}
+ @Override
+ public ListenableFuture fetchBootstrapSegments()
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public CoordinatorClient withRetryPolicy(ServiceRetryPolicy retryPolicy)
{
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
index b7ce3b8e0588..f6b1c39c59da 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerCacheTest.java
@@ -23,6 +23,8 @@
import com.google.common.collect.ImmutableList;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -34,7 +36,6 @@
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.TestSegmentUtils;
-import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Before;
@@ -63,6 +64,8 @@ public class SegmentLoadDropHandlerCacheTest
private SegmentManager segmentManager;
private SegmentLoaderConfig loaderConfig;
private SegmentLocalCacheManager cacheManager;
+ private TestCoordinatorClient coordinatorClient;
+ private ServiceEmitter emitter;
private ObjectMapper objectMapper;
@Before
@@ -100,7 +103,9 @@ public List getLocations()
segmentManager = new SegmentManager(cacheManager);
segmentAnnouncer = new TestDataSegmentAnnouncer();
serverAnnouncer = new TestDataServerAnnouncer();
- EmittingLogger.registerEmitter(new NoopServiceEmitter());
+ coordinatorClient = new TestCoordinatorClient();
+ emitter = new StubServiceEmitter();
+ EmittingLogger.registerEmitter(emitter);
}
@Test
@@ -122,7 +127,9 @@ public void testLoadStartStopWithEmptyLocations() throws IOException
segmentAnnouncer,
serverAnnouncer,
segmentManager,
- new ServerTypeConfig(ServerType.BROKER)
+ new ServerTypeConfig(ServerType.BROKER),
+ coordinatorClient,
+ emitter
);
loadDropHandler.start();
@@ -140,7 +147,9 @@ public void testLoadStartStop() throws IOException
segmentAnnouncer,
serverAnnouncer,
segmentManager,
- new ServerTypeConfig(ServerType.BROKER)
+ new ServerTypeConfig(ServerType.BROKER),
+ coordinatorClient,
+ emitter
);
loadDropHandler.start();
@@ -171,7 +180,9 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException
segmentAnnouncer,
serverAnnouncer,
segmentManager,
- new ServerTypeConfig(ServerType.HISTORICAL)
+ new ServerTypeConfig(ServerType.HISTORICAL),
+ coordinatorClient,
+ emitter
);
// Start the load drop handler
diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 1a776c6c34a1..9fe04d60d5bf 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -22,12 +22,15 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
@@ -37,7 +40,6 @@
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.TestSegmentUtils;
import org.apache.druid.server.coordination.SegmentChangeStatus.State;
-import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.Assert;
@@ -72,6 +74,8 @@ public class SegmentLoadDropHandlerTest
private List scheduledRunnable;
private SegmentLoaderConfig segmentLoaderConfig;
private ScheduledExecutorFactory scheduledExecutorFactory;
+ private TestCoordinatorClient coordinatorClient;
+ private StubServiceEmitter serviceEmitter;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -136,7 +140,9 @@ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit)
};
};
- EmittingLogger.registerEmitter(new NoopServiceEmitter());
+ coordinatorClient = new TestCoordinatorClient();
+ serviceEmitter = new StubServiceEmitter();
+ EmittingLogger.registerEmitter(serviceEmitter);
}
/**
@@ -293,6 +299,71 @@ public void testLoadCache() throws Exception
Assert.assertEquals(1, cacheManager.observedShutdownBootstrapCount.get());
}
+ @Test
+ public void testLoadBootstrapSegments() throws Exception
+ {
+ final Set segments = new HashSet<>();
+ for (int i = 0; i < COUNT; ++i) {
+ segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-01")));
+ segments.add(makeSegment("test" + i, "1", Intervals.of("P1d/2011-04-02")));
+ segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-01")));
+ segments.add(makeSegment("test_two" + i, "1", Intervals.of("P1d/2011-04-02")));
+ }
+
+ final TestCoordinatorClient coordinatorClient = new TestCoordinatorClient(segments);
+ final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
+ final SegmentManager segmentManager = new SegmentManager(cacheManager);
+
+ final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager, coordinatorClient);
+
+ Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+ handler.start();
+
+ Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+ Assert.assertFalse(segmentManager.getDataSourceCounts().isEmpty());
+
+ for (int i = 0; i < COUNT; ++i) {
+ Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test" + i).longValue());
+ Assert.assertEquals(2L, segmentManager.getDataSourceCounts().get("test_two" + i).longValue());
+ }
+
+ final ImmutableList expectedBootstrapSegments = ImmutableList.copyOf(segments);
+
+ Assert.assertEquals(expectedBootstrapSegments, segmentAnnouncer.getObservedSegments());
+
+ Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegments);
+ Assert.assertEquals(expectedBootstrapSegments, cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
+ serviceEmitter.verifyValue("segment/bootstrap/count", expectedBootstrapSegments.size());
+ serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
+
+ handler.stop();
+ }
+
+ @Test
+ public void testLoadBootstrapSegmentsWhenExceptionThrown() throws Exception
+ {
+ final TestSegmentCacheManager cacheManager = new TestSegmentCacheManager();
+ final SegmentManager segmentManager = new SegmentManager(cacheManager);
+
+ final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(segmentManager, new NoopCoordinatorClient());
+
+ Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+ handler.start();
+
+ Assert.assertEquals(1, serverAnnouncer.getObservedCount());
+ Assert.assertTrue(segmentManager.getDataSourceCounts().isEmpty());
+
+ Assert.assertEquals(ImmutableList.of(), segmentAnnouncer.getObservedSegments());
+ Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegments);
+ Assert.assertEquals(ImmutableList.of(), cacheManager.observedBootstrapSegmentsLoadedIntoPageCache);
+ serviceEmitter.verifyValue("segment/bootstrap/count", 0);
+ serviceEmitter.verifyEmitted("segment/bootstrap/time", 1);
+
+ handler.stop();
+ }
+
@Test
public void testStartStop() throws Exception
{
@@ -467,7 +538,8 @@ public int getDropSegmentDelayMillis()
final SegmentLoadDropHandler handler = initSegmentLoadDropHandler(
noAnnouncerSegmentLoaderConfig,
- segmentManager
+ segmentManager,
+ coordinatorClient
);
handler.start();
@@ -543,12 +615,21 @@ public int getDropSegmentDelayMillis()
Assert.assertEquals(0, serverAnnouncer.getObservedCount());
}
+ private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager segmentManager, CoordinatorClient coordinatorClient)
+ {
+ return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager, coordinatorClient);
+ }
+
private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentManager segmentManager)
{
- return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager);
+ return initSegmentLoadDropHandler(segmentLoaderConfig, segmentManager, coordinatorClient);
}
- private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentLoaderConfig config, SegmentManager segmentManager)
+ private SegmentLoadDropHandler initSegmentLoadDropHandler(
+ SegmentLoaderConfig config,
+ SegmentManager segmentManager,
+ CoordinatorClient coordinatorClient
+ )
{
return new SegmentLoadDropHandler(
config,
@@ -556,7 +637,9 @@ private SegmentLoadDropHandler initSegmentLoadDropHandler(SegmentLoaderConfig co
serverAnnouncer,
segmentManager,
scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
- new ServerTypeConfig(ServerType.HISTORICAL)
+ new ServerTypeConfig(ServerType.HISTORICAL),
+ coordinatorClient,
+ serviceEmitter
);
}
diff --git a/server/src/test/java/org/apache/druid/server/coordination/TestCoordinatorClient.java b/server/src/test/java/org/apache/druid/server/coordination/TestCoordinatorClient.java
new file mode 100644
index 000000000000..9f297ddd39eb
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordination/TestCoordinatorClient.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.BootstrapSegmentsResponse;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.HashSet;
+import java.util.Set;
+
+class TestCoordinatorClient extends NoopCoordinatorClient
+{
+ private final Set bootstrapSegments;
+
+ TestCoordinatorClient()
+ {
+ this(new HashSet<>());
+ }
+
+ TestCoordinatorClient(final Set bootstrapSegments)
+ {
+ this.bootstrapSegments = bootstrapSegments;
+ }
+
+ @Override
+ public ListenableFuture fetchBootstrapSegments()
+ {
+ return Futures.immediateFuture(
+ new BootstrapSegmentsResponse(CloseableIterators.withEmptyBaggage(bootstrapSegments.iterator()))
+ );
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
index f0f2bd3b4e44..9f5291af598f 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
@@ -106,7 +106,9 @@ public void testLoadDrop() throws Exception
EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
EasyMock.createNiceMock(SegmentManager.class),
EasyMock.createNiceMock(ScheduledExecutorService.class),
- new ServerTypeConfig(ServerType.HISTORICAL)
+ new ServerTypeConfig(ServerType.HISTORICAL),
+ new TestCoordinatorClient(),
+ new NoopServiceEmitter()
)
{
@Override
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index a5ed1616a6d0..8c7e0ae14e5f 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -82,6 +82,7 @@
import javax.annotation.Nullable;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -266,6 +267,7 @@ public void testCoordinatorRun() throws Exception
coordinator.start();
Assert.assertNull(coordinator.getReplicationFactor(dataSegment.getId()));
+ Assert.assertNull(coordinator.getBroadcastSegments());
// Wait for this coordinator to become leader
leaderAnnouncerLatch.await();
@@ -293,6 +295,7 @@ public void testCoordinatorRun() throws Exception
coordinator.getDatasourceToUnavailableSegmentCount();
Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.size());
Assert.assertEquals(0, numsUnavailableUsedSegmentsPerDataSource.getInt(dataSource));
+ Assert.assertEquals(0, coordinator.getBroadcastSegments().size());
Map> underReplicationCountsPerDataSourcePerTier =
coordinator.getTierToDatasourceToUnderReplicatedCount(false);
@@ -571,6 +574,7 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith
coordinatorRunLatch.await();
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getDatasourceToLoadStatus());
+ Assert.assertEquals(new HashSet<>(dataSegments.values()), coordinator.getBroadcastSegments());
// Under-replicated counts are updated only after the next coordinator run
Map> underReplicationCountsPerDataSourcePerTier =
diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
index 1b86bbca4536..4d6bbf5929be 100644
--- a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
@@ -486,6 +486,35 @@ public void testGetSegment()
);
}
+ @Test
+ public void testGetBootstrapSegments()
+ {
+ Mockito.doReturn(ImmutableSet.of(segments[0], segments[1])).when(coordinator).getBroadcastSegments();
+
+ Response response = metadataResource.getBootstrapSegments();
+ final List observedSegments = extractResponseList(response);
+ Assert.assertEquals(2, observedSegments.size());
+ }
+
+ @Test
+ public void testEmptyGetBootstrapSegments()
+ {
+ Mockito.doReturn(ImmutableSet.of()).when(coordinator).getBroadcastSegments();
+
+ Response response = metadataResource.getBootstrapSegments();
+ final List observedSegments = extractResponseList(response);
+ Assert.assertEquals(0, observedSegments.size());
+ }
+
+ @Test
+ public void testNullGetBootstrapSegments()
+ {
+ Mockito.doReturn(null).when(coordinator).getBroadcastSegments();
+
+ Response response = metadataResource.getBootstrapSegments();
+ Assert.assertEquals(503, response.getStatus());
+ }
+
private List extractResponseList(Response response)
{
return Lists.newArrayList(