diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index 58361aba7538..df4677c3bc8f 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; +import org.apache.druid.client.BrokerViewOfCoordinatorConfig; import org.apache.druid.client.CachingClusteredClient; import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidServer; @@ -53,6 +54,7 @@ import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.BaseQuery; import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; @@ -100,8 +102,11 @@ import org.apache.druid.segment.generator.GeneratorBasicSchemas; import org.apache.druid.segment.generator.GeneratorSchemaInfo; import org.apache.druid.segment.generator.SegmentGenerator; +import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryStackTests; +import org.apache.druid.server.ResourceIdPopulatingQueryRunner; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestCoordinatorClient; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; @@ -176,6 +181,7 @@ public class CachingClusteredClientBenchmark private final QuerySegmentSpec basicSchemaIntervalSpec = new MultipleIntervalSegmentSpec( Collections.singletonList(basicSchema.getDataInterval()) ); + private final BrokerViewOfCoordinatorConfig filter = new BrokerViewOfCoordinatorConfig(new TestCoordinatorClient()); private final int numProcessingThreads = 4; @@ -222,6 +228,7 @@ public void setup() rowsPerSegment ); queryableIndexes.put(dataSegment, index); + filter.start(); } final DruidProcessingConfig processingConfig = new DruidProcessingConfig() @@ -368,18 +375,21 @@ public void tearDown() throws IOException @OutputTimeUnit(TimeUnit.MICROSECONDS) public void timeseriesQuery(Blackhole blackhole) { - query = Druids.newTimeseriesQueryBuilder() - .dataSource(DATA_SOURCE) - .intervals(basicSchemaIntervalSpec) - .aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential")) - .granularity(Granularity.fromString(queryGranularity)) - .context( - ImmutableMap.of( - QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine, - QueryContexts.BROKER_PARALLELISM, parallelism - ) - ) - .build(); + Query q = Druids.newTimeseriesQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(basicSchemaIntervalSpec) + .aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential")) + .granularity(Granularity.fromString(queryGranularity)) + .context( + ImmutableMap.of( + BaseQuery.QUERY_ID, "BenchmarkQuery", + QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine, + QueryContexts.BROKER_PARALLELISM, parallelism + ) + ) + .build(); + + query = prepareQuery(q); final List> results = runQuery(); @@ -393,7 +403,7 @@ public void timeseriesQuery(Blackhole blackhole) @OutputTimeUnit(TimeUnit.MICROSECONDS) public void topNQuery(Blackhole blackhole) { - query = new TopNQueryBuilder() + Query q = new TopNQueryBuilder() .dataSource(DATA_SOURCE) .intervals(basicSchemaIntervalSpec) .dimension(new DefaultDimensionSpec("dimZipf", null)) @@ -403,12 +413,15 @@ public void topNQuery(Blackhole blackhole) .threshold(10_000) // we are primarily measuring 'broker' merge time, so collect a significant number of results .context( ImmutableMap.of( + BaseQuery.QUERY_ID, "BenchmarkQuery", QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine, QueryContexts.BROKER_PARALLELISM, parallelism ) ) .build(); + query = prepareQuery(q); + final List> results = runQuery(); for (Result result : results) { @@ -421,7 +434,7 @@ public void topNQuery(Blackhole blackhole) @OutputTimeUnit(TimeUnit.MICROSECONDS) public void groupByQuery(Blackhole blackhole) { - query = GroupByQuery + Query q = GroupByQuery .builder() .setDataSource(DATA_SOURCE) .setQuerySegmentSpec(basicSchemaIntervalSpec) @@ -433,12 +446,15 @@ public void groupByQuery(Blackhole blackhole) .setGranularity(Granularity.fromString(queryGranularity)) .setContext( ImmutableMap.of( + BaseQuery.QUERY_ID, "BenchmarkQuery", QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine, QueryContexts.BROKER_PARALLELISM, parallelism ) ) .build(); + query = prepareQuery(q); + final List results = runQuery(); for (ResultRow result : results) { @@ -446,6 +462,17 @@ public void groupByQuery(Blackhole blackhole) } } + private Query prepareQuery(Query query) + { + return ResourceIdPopulatingQueryRunner.populateResourceId(query) + .withDataSource(ClientQuerySegmentWalker.generateSubqueryIds( + query.getDataSource(), + query.getId(), + query.getSqlQueryId(), + query.context().getString(QueryContexts.QUERY_RESOURCE_ID) + )); + } + private List runQuery() { //noinspection unchecked @@ -496,7 +523,7 @@ void addSegmentToServer(DruidServer server, DataSegment segment) { final ServerSelector selector = selectors.computeIfAbsent( segment.getId().toString(), - k -> new ServerSelector(segment, tierSelectorStrategy) + k -> new ServerSelector(segment, tierSelectorStrategy, filter) ); selector.addServerAndUpdateSegment(servers.get(server), segment); timelines.computeIfAbsent(segment.getDataSource(), k -> new VersionedIntervalTimeline<>(Ordering.natural())) diff --git a/docs/api-reference/service-status-api.md b/docs/api-reference/service-status-api.md index 6e801ae632e1..48679658d440 100644 --- a/docs/api-reference/service-status-api.md +++ b/docs/api-reference/service-status-api.md @@ -658,6 +658,138 @@ Host: http://COORDINATOR_IP:COORDINATOR_PORT + +### Get Historical Cloning Status + +Retrieves the current status of Historical cloning from the Coordinator. + +#### URL + +`GET` `/druid/coordinator/v1/config/cloneStatus` + +#### Responses + + + + + + +
+ +*Successfully retrieved cloning status* + +
+
+ +#### Sample request + + + + + + +```shell +curl "http://COORDINATOR_IP:COORDINATOR_PORT/druid/coordinator/v1/config/cloneStatus" +``` + + + + + +```http +GET /druid/coordinator/v1/config/cloneStatus HTTP/1.1 +Host: http://COORDINATOR_IP:COORDINATOR_PORT +``` + + + + +#### Sample response + +
+ View the response + +```json +{ + "cloneStatus": [ + { + "sourceServer": "localhost:8089", + "targetServer": "localhost:8083", + "state": "IN_PROGRESS", + "segmentLoadsRemaining": 0, + "segmentDropsRemaining": 0, + "bytesToLoad": 0 + } + ] +} +``` + +
+ +### Get Broker dynamic configuration view + +Retrieves the list of Brokers which have an up-to-date view of Coordinator dynamic configuration. + +#### URL + +`GET` `/druid/coordinator/v1/config/syncedBrokers` + +#### Responses + + + + + + +
+ +*Successfully retrieved Broker Configuration view* + +
+
+ +#### Sample request + + + + + + +```shell +curl "http://COORDINATOR_IP:COORDINATOR_PORT/druid/coordinator/v1/config/syncedBrokers" +``` + + + + + +```http +GET /druid/coordinator/v1/config/syncedBrokers HTTP/1.1 +Host: http://COORDINATOR_IP:COORDINATOR_PORT +``` + + + + +#### Sample response + +
+ View the response + +```json +{ + "syncedBrokers": [ + { + "host": "localhost", + "port": 8082, + "syncTimeInMs": 1745756337472 + } + ] +} +``` + +
+ ## Overlord ### Get Overlord leader address @@ -1334,4 +1466,4 @@ Host: http://BROKER_IP:BROKER_PORT #### Sample response -A successful response to this endpoint results in an empty response body. \ No newline at end of file +A successful response to this endpoint results in an empty response body. diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md index f62c7f8334f6..78aec8f9f286 100644 --- a/docs/querying/query-context.md +++ b/docs/querying/query-context.md @@ -66,6 +66,7 @@ See [SQL query context](sql-query-context.md) for other query context parameters |`debug`| `false` | Flag indicating whether to enable debugging outputs for the query. When set to false, no additional logs will be produced (logs produced will be entirely dependent on your logging level). When set to true, the following addition logs will be produced:
- Log the stack trace of the exception (if any) produced by the query | |`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. | |`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.| +|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `clonesPreferred`. `excludeClones` means that clone Historicals are not queried by the broker. `clonesPreferred` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones; Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries; MSQ does not query Historicals directly, and Dart will not respect this context parameter.| ## Parameters by query type diff --git a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java index 606c6a529587..1ec98359db79 100644 --- a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java +++ b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java @@ -31,6 +31,7 @@ import org.apache.druid.client.BatchServerInventoryView; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.BrokerServerView; +import org.apache.druid.client.BrokerViewOfCoordinatorConfig; import org.apache.druid.client.DirectDruidClientFactory; import org.apache.druid.client.DruidServer; import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; @@ -58,6 +59,7 @@ import org.apache.druid.segment.realtime.appenderator.SegmentSchemas; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestCoordinatorClient; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -319,12 +321,15 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) EasyMock.createMock(HttpClient.class) ); + BrokerViewOfCoordinatorConfig filter = new BrokerViewOfCoordinatorConfig(new TestCoordinatorClient()); + filter.start(); brokerServerView = new BrokerServerView( druidClientFactory, baseView, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), new NoopServiceEmitter(), - new BrokerSegmentWatcherConfig() + new BrokerSegmentWatcherConfig(), + filter ); baseView.start(); } diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index dd5f79eecf5d..caa402a142ec 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -316,7 +316,6 @@ public long getMaxQueuedBytes() return 0L; } }; - CachingClusteredClient baseClient = new CachingClusteredClient( conglomerate, new TimelineServerView() diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java index f276caa435e6..00813d0f2fb7 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java @@ -39,6 +39,7 @@ import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.input.table.TableInputSpec; +import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.filter.DimFilterUtils; import org.apache.druid.server.coordination.DruidServerMetadata; @@ -162,7 +163,8 @@ public List sliceDynamic( */ int findWorkerForServerSelector(final ServerSelector serverSelector, final int maxNumSlices) { - final QueryableDruidServer server = serverSelector.pick(null); + // Currently, Dart does not support clone query modes, all servers can be queried. + final QueryableDruidServer server = serverSelector.pick(null, CloneQueryMode.INCLUDECLONES); if (server == null) { return UNKNOWN; @@ -279,7 +281,8 @@ static boolean shouldIncludeSegment(final ServerSelector serverSelector) int numRealtimeServers = 0; int numOtherServers = 0; - for (final DruidServerMetadata server : serverSelector.getAllServers()) { + // Currently, Dart does not support clone query modes, all servers can be queried. + for (final DruidServerMetadata server : serverSelector.getAllServers(CloneQueryMode.INCLUDECLONES)) { if (SegmentSource.REALTIME.getUsedServerTypes().contains(server.getType())) { numRealtimeServers++; } else { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java index 92ce2bd2a1c4..a65c1252c6eb 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java @@ -28,6 +28,7 @@ import org.apache.druid.client.QueryableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; +import org.apache.druid.client.selector.HistoricalFilter; import org.apache.druid.client.selector.RandomServerSelectorStrategy; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.data.input.StringTuple; @@ -214,7 +215,8 @@ void setUp() final IntList segmentServers = entry.getValue(); final ServerSelector serverSelector = new ServerSelector( dataSegment, - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + HistoricalFilter.IDENTITY_FILTER ); for (int serverNumber : segmentServers) { final DruidServerMetadata serverMetadata = SERVERS.get(serverNumber); diff --git a/processing/src/main/java/org/apache/druid/query/CloneQueryMode.java b/processing/src/main/java/org/apache/druid/query/CloneQueryMode.java new file mode 100644 index 000000000000..643c1dcb4036 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/CloneQueryMode.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.druid.error.InvalidInput; + +/** + * Enum used in the query context to determine if clone queries should be used by native queries. + */ +public enum CloneQueryMode +{ + /** + * For each ongoing cloning, do not query the source server that is being cloned. Other servers which are not + * participating in any cloning will still be queried. + */ + PREFERCLONES("preferClones"), + /** + * Consider both clones and their source servers for querying. + */ + INCLUDECLONES("includeClones"), + /** + * Do not query clone servers. + */ + EXCLUDECLONES("excludeClones"); + + private final String name; + + CloneQueryMode(String name) + { + this.name = name; + } + + @JsonCreator + public static CloneQueryMode fromString(String value) + { + for (CloneQueryMode mode : values()) { + if (mode.toString().equals(value)) { + return mode; + } + } + + throw InvalidInput.exception("No such clone query mode[%s]", value); + } + + @Override + public String toString() + { + return name; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java b/processing/src/main/java/org/apache/druid/query/QueryContext.java index 5a1499c6c8b7..17ddcd3aac03 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContext.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java @@ -552,6 +552,15 @@ public boolean getEnableJoinFilterRewriteValueColumnFilters() ); } + public CloneQueryMode getCloneQueryMode() + { + return getEnum( + QueryContexts.CLONE_QUERY_MODE, + CloneQueryMode.class, + QueryContexts.DEFAULT_CLONE_QUERY_MODE + ); + } + public boolean getEnableRewriteJoinToFilter() { return getBoolean( diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java b/processing/src/main/java/org/apache/druid/query/QueryContexts.java index a2a81c6eb746..ef48d67fbf54 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java @@ -65,6 +65,7 @@ public class QueryContexts public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = "joinFilterRewriteMaxSize"; public static final String MAX_NUMERIC_IN_FILTERS = "maxNumericInFilters"; public static final String CURSOR_AUTO_ARRANGE_FILTERS = "cursorAutoArrangeFilters"; + public static final String CLONE_QUERY_MODE = "cloneQueryMode"; // This flag controls whether a SQL join query with left scan should be attempted to be run as direct table access // instead of being wrapped inside a query. With direct table access enabled, Druid can push down the join operation to // data servers. @@ -149,6 +150,7 @@ public class QueryContexts public static final boolean DEFAULT_ENABLE_JOIN_FILTER_PUSH_DOWN = true; public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = true; public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false; + public static final CloneQueryMode DEFAULT_CLONE_QUERY_MODE = CloneQueryMode.EXCLUDECLONES; public static final boolean DEFAULT_ENABLE_REWRITE_JOIN_TO_FILTER = true; public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE = 10000; public static final boolean DEFAULT_ENABLE_SQL_JOIN_LEFT_SCAN_DIRECT = false; diff --git a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java index 64538f33d8dd..735fabe0f8b3 100644 --- a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java +++ b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java @@ -144,6 +144,15 @@ public void testDefaultPlanTimeBoundarySql() ); } + @Test + public void testDefaultCloneQueryMode() + { + Assert.assertEquals( + CloneQueryMode.EXCLUDECLONES, + QueryContext.empty().getCloneQueryMode() + ); + } + @Test public void testCatalogValidationEnabled() { diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 6eaea5796f52..9e13625b9bb8 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -75,6 +75,7 @@ public class BrokerServerView implements TimelineServerView private final Predicate> segmentFilter; private final CountDownLatch initialized = new CountDownLatch(1); private final FilteredServerInventoryView baseView; + private final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig; @Inject public BrokerServerView( @@ -82,13 +83,15 @@ public BrokerServerView( final FilteredServerInventoryView baseView, final TierSelectorStrategy tierSelectorStrategy, final ServiceEmitter emitter, - final BrokerSegmentWatcherConfig segmentWatcherConfig + final BrokerSegmentWatcherConfig segmentWatcherConfig, + final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig ) { this.druidClientFactory = directDruidClientFactory; this.baseView = baseView; this.tierSelectorStrategy = tierSelectorStrategy; this.emitter = emitter; + this.brokerViewOfCoordinatorConfig = brokerViewOfCoordinatorConfig; // Validate and set the segment watcher config validateSegmentWatcherConfig(segmentWatcherConfig); @@ -253,7 +256,7 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm log.debug("Adding segment[%s] for server[%s]", segment, server); ServerSelector selector = selectors.get(segmentId); if (selector == null) { - selector = new ServerSelector(segment, tierSelectorStrategy); + selector = new ServerSelector(segment, tierSelectorStrategy, brokerViewOfCoordinatorConfig); VersionedIntervalTimeline timeline = timelines.get(segment.getDataSource()); if (timeline == null) { diff --git a/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java new file mode 100644 index 000000000000..5334fff0731b --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Inject; +import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import org.apache.druid.client.coordinator.Coordinator; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.client.coordinator.CoordinatorClientImpl; +import org.apache.druid.client.selector.HistoricalFilter; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.error.DruidException; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.CloneQueryMode; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocator; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.server.BrokerDynamicConfigResource; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; + +import javax.validation.constraints.NotNull; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Broker view of the coordinator dynamic configuration, and its derived values such as target and source clone servers. + * This class is registered as a managed lifecycle to fetch the coordinator dynamic configuration on startup. Further + * updates are handled through {@link BrokerDynamicConfigResource}. + */ +public class BrokerViewOfCoordinatorConfig implements HistoricalFilter +{ + private static final Logger log = new Logger(BrokerViewOfCoordinatorConfig.class); + private final CoordinatorClient coordinatorClient; + + @GuardedBy("this") + private CoordinatorDynamicConfig config; + @GuardedBy("this") + private Set targetCloneServers; + @GuardedBy("this") + private Set sourceCloneServers; + + @Inject + public BrokerViewOfCoordinatorConfig( + @Json final ObjectMapper jsonMapper, + @EscalatedGlobal final ServiceClientFactory clientFactory, + @Coordinator final ServiceLocator serviceLocator + ) + { + this.coordinatorClient = + new CoordinatorClientImpl( + clientFactory.makeClient( + NodeRole.COORDINATOR.getJsonName(), + serviceLocator, + StandardRetryPolicy.builder().maxAttempts(15).build() + ), + jsonMapper + ); + } + + @VisibleForTesting + public BrokerViewOfCoordinatorConfig(CoordinatorClient coordinatorClient) + { + this.coordinatorClient = coordinatorClient; + } + + /** + * Return the latest {@link CoordinatorDynamicConfig}. + */ + public synchronized CoordinatorDynamicConfig getDynamicConfig() + { + return config; + } + + /** + * Update the config view with a new coordinator dynamic config snapshot. Also updates the source and target clone + * servers based on the new dynamic configuration. + */ + public synchronized void setDynamicConfig(@NotNull CoordinatorDynamicConfig updatedConfig) + { + config = updatedConfig; + final Map cloneServers = config.getCloneServers(); + this.targetCloneServers = ImmutableSet.copyOf(cloneServers.keySet()); + this.sourceCloneServers = ImmutableSet.copyOf(cloneServers.values()); + } + + @LifecycleStart + public void start() + { + try { + log.info("Fetching coordinator dynamic configuration."); + + CoordinatorDynamicConfig coordinatorDynamicConfig = coordinatorClient.getCoordinatorDynamicConfig().get(); + setDynamicConfig(coordinatorDynamicConfig); + + log.info("Successfully fetched coordinator dynamic config[%s].", coordinatorDynamicConfig); + } + catch (Exception e) { + // If the fetch fails, the broker should not serve queries. Throw the exception and try again on restart. + throw new RuntimeException("Failed to initialize coordinator dynamic config", e); + } + } + + @Override + public Int2ObjectRBTreeMap> getQueryableServers( + Int2ObjectRBTreeMap> historicalServers, + CloneQueryMode mode + ) + { + final Set serversToIgnore = getCurrentServersToIgnore(mode); + + if (serversToIgnore.isEmpty()) { + return historicalServers; + } + + final Int2ObjectRBTreeMap> filteredHistoricals = new Int2ObjectRBTreeMap<>(); + for (int priority : historicalServers.keySet()) { + Set servers = historicalServers.get(priority); + filteredHistoricals.put(priority, + servers.stream() + .filter(server -> !serversToIgnore.contains(server.getServer().getHost())) + .collect(Collectors.toSet()) + ); + } + + return filteredHistoricals; + } + + /** + * Get the list of servers that should not be queried based on the cloneQueryMode parameter. + */ + private synchronized Set getCurrentServersToIgnore(CloneQueryMode cloneQueryMode) + { + switch (cloneQueryMode) { + case PREFERCLONES: + // Remove servers being cloned targets, so that clones are queried. + return sourceCloneServers; + case EXCLUDECLONES: + // Remove clones, so that only source servers are queried. + return targetCloneServers; + case INCLUDECLONES: + // Don't remove either. + return Set.of(); + default: + throw DruidException.defensive("Unexpected value: [%s]", cloneQueryMode); + } + } +} diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 5594f52af7b6..e0c7f0e4e5f9 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -57,6 +57,7 @@ import org.apache.druid.query.BrokerParallelMergeConfig; import org.apache.druid.query.BySegmentResultValueClass; import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.Queries; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; @@ -343,13 +344,14 @@ ClusterQueryResult run( } final Set segmentServers = computeSegmentsToQuery(timeline, specificSegments); + final CloneQueryMode cloneQueryMode = query.context().getCloneQueryMode(); @Nullable final byte[] queryCacheKey = cacheKeyManager.computeSegmentLevelQueryCacheKey(); @Nullable final String prevEtag = (String) query.getContext().get(QueryResource.HEADER_IF_NONE_MATCH); if (prevEtag != null) { @Nullable - final String currentEtag = cacheKeyManager.computeResultLevelCachingEtag(segmentServers, queryCacheKey); + final String currentEtag = cacheKeyManager.computeResultLevelCachingEtag(segmentServers, cloneQueryMode, queryCacheKey); if (null != currentEtag) { responseContext.putEntityTag(currentEtag); } @@ -366,7 +368,10 @@ ClusterQueryResult run( queryPlus = queryPlus.withQueryMetrics(toolChest); queryPlus.getQueryMetrics().reportQueriedSegmentCount(segmentServers.size()).emit(emitter); - final SortedMap> segmentsByServer = groupSegmentsByServer(segmentServers); + final SortedMap> segmentsByServer = groupSegmentsByServer( + segmentServers, + cloneQueryMode + ); LazySequence mergedResultSequence = new LazySequence<>(() -> { List> sequencesByInterval = new ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size()); addSequencesFromCache(sequencesByInterval, alreadyCachedResults); @@ -594,11 +599,15 @@ private Cache.NamedKey getCachePopulatorKey(String segmentId, Interval segmentIn return cachePopulatorKeyMap.get(StringUtils.format("%s_%s", segmentId, segmentInterval)); } - private SortedMap> groupSegmentsByServer(Set segments) + private SortedMap> groupSegmentsByServer( + Set segments, + CloneQueryMode cloneQueryMode + ) { final SortedMap> serverSegments = new TreeMap<>(); for (SegmentServerSelector segmentServer : segments) { - final QueryableDruidServer queryableDruidServer = segmentServer.getServer().pick(query); + final QueryableDruidServer queryableDruidServer = segmentServer.getServer() + .pick(query, cloneQueryMode); if (queryableDruidServer == null) { log.makeAlert( @@ -810,13 +819,14 @@ byte[] computeSegmentLevelQueryCacheKey() @Nullable String computeResultLevelCachingEtag( final Set segments, + final CloneQueryMode cloneQueryMode, @Nullable byte[] queryCacheKey ) { Hasher hasher = Hashing.sha1().newHasher(); boolean hasOnlyHistoricalSegments = true; for (SegmentServerSelector p : segments) { - QueryableDruidServer queryableServer = p.getServer().pick(query); + QueryableDruidServer queryableServer = p.getServer().pick(query, cloneQueryMode); if (queryableServer == null || !queryableServer.getServer().isSegmentReplicationTarget()) { hasOnlyHistoricalSegments = false; break; diff --git a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java index 914d51459d70..6369833000b5 100644 --- a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java +++ b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java @@ -20,6 +20,7 @@ package org.apache.druid.client; import org.apache.druid.client.selector.ServerSelector; +import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.LocatedSegmentDescriptor; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.TableDataSource; @@ -42,17 +43,19 @@ public static List getTargetLocations( TimelineServerView serverView, String datasource, List intervals, - int numCandidates + int numCandidates, + CloneQueryMode cloneQueryMode ) { - return getTargetLocations(serverView, new TableDataSource(datasource), intervals, numCandidates); + return getTargetLocations(serverView, new TableDataSource(datasource), intervals, numCandidates, cloneQueryMode); } public static List getTargetLocations( TimelineServerView serverView, TableDataSource datasource, List intervals, - int numCandidates + int numCandidates, + CloneQueryMode cloneQueryMode ) { final Optional> maybeTimeline = serverView.getTimeline(datasource); @@ -68,7 +71,7 @@ public static List getTargetLocations( holder.getInterval(), holder.getVersion(), chunk.getChunkNumber() ); long size = selector.getSegment().getSize(); - List candidates = selector.getCandidates(numCandidates); + List candidates = selector.getCandidates(numCandidates, cloneQueryMode); located.add(new LocatedSegmentDescriptor(descriptor, size, candidates)); } } diff --git a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java index ea370572c447..7faa2ecd9658 100644 --- a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java +++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java @@ -23,6 +23,7 @@ import org.apache.druid.query.explain.ExplainPlan; import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.query.http.SqlTaskStatus; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import java.util.List; @@ -53,4 +54,9 @@ public interface BrokerClient * @param sqlQuery the SQL query for which the {@code EXPLAIN PLAN FOR} information is to be fetched */ ListenableFuture> fetchExplainPlan(ClientSqlQuery sqlQuery); + + /** + * Updates the broker with the given {@link CoordinatorDynamicConfig}. + */ + ListenableFuture updateCoordinatorDynamicConfig(CoordinatorDynamicConfig config); } diff --git a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java index 5ad609147429..3bce514c9801 100644 --- a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java +++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java @@ -33,7 +33,9 @@ import org.apache.druid.query.http.SqlTaskStatus; import org.apache.druid.rpc.RequestBuilder; import org.apache.druid.rpc.ServiceClient; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.nio.charset.StandardCharsets; import java.util.List; @@ -96,5 +98,21 @@ public ListenableFuture> fetchExplainPlan(final ClientSqlQuery holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), new TypeReference<>() {}) ); } + + @Override + public ListenableFuture updateCoordinatorDynamicConfig(CoordinatorDynamicConfig config) + { + final RequestBuilder requestBuilder = + new RequestBuilder(HttpMethod.POST, "/druid-internal/v1/config/coordinator") + .jsonContent(jsonMapper, config); + + return FutureUtils.transform( + client.asyncRequest(requestBuilder, new BytesFullResponseHandler()), + holder -> { + final HttpResponseStatus status = holder.getStatus(); + return status.equals(HttpResponseStatus.OK); + } + ); + } } diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index bcd3e4348e52..c4f112917c84 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -26,6 +26,7 @@ import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.segment.metadata.DataSourceInformation; import org.apache.druid.server.compaction.CompactionStatusResponse; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -87,4 +88,10 @@ public interface CoordinatorClient */ ListenableFuture getCompactionSnapshots(@Nullable String dataSource); + /** + * Gets the latest coordinator dynamic config. + *

+ * API: {@code GET /druid/coordinator/v1/config} + */ + ListenableFuture getCoordinatorDynamicConfig(); } diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java index 50cd58e0eb33..bb89bc965e8c 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClientImpl.java @@ -38,6 +38,7 @@ import org.apache.druid.segment.metadata.DataSourceInformation; import org.apache.druid.server.compaction.CompactionStatusResponse; import org.apache.druid.server.coordination.LoadableDataSegment; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.Interval; @@ -222,4 +223,20 @@ public ListenableFuture getCompactionSnapshots(@Nullab ) ); } + + @Override + public ListenableFuture getCoordinatorDynamicConfig() + { + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.GET, "/druid/coordinator/v1/config"), + new BytesFullResponseHandler() + ), + holder -> JacksonUtils.readValue( + jsonMapper, + holder.getContent(), + CoordinatorDynamicConfig.class + ) + ); + } } diff --git a/server/src/main/java/org/apache/druid/client/selector/HistoricalFilter.java b/server/src/main/java/org/apache/druid/client/selector/HistoricalFilter.java new file mode 100644 index 000000000000..e6ea6cab07b8 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/selector/HistoricalFilter.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.selector; + +import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import org.apache.druid.client.QueryableDruidServer; +import org.apache.druid.query.CloneQueryMode; + +import java.util.Set; + +/** + * Interface that denotes some sort of filtering on the historcals, based on {@link CloneQueryMode}. + */ +public interface HistoricalFilter +{ + /** + * Perform no filtering, regardless of the query mode. + */ + HistoricalFilter IDENTITY_FILTER = (historicalServers, mode) -> historicalServers; + + /** + * Returns a {@link Int2ObjectRBTreeMap} after performing a filtering on the {@link QueryableDruidServer}, based + * on the cloneQueryMode paramter. The map in the parameter is not modified. + */ + Int2ObjectRBTreeMap> getQueryableServers( + Int2ObjectRBTreeMap> historicalServers, + CloneQueryMode mode + ); +} diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java index 31793d1103ff..1d42ff3dc9ee 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java @@ -23,6 +23,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import org.apache.druid.client.DataSegmentInterner; import org.apache.druid.client.QueryableDruidServer; +import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.Query; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; @@ -51,15 +52,19 @@ public class ServerSelector implements Overshadowable private final AtomicReference segment; + private final HistoricalFilter filter; + public ServerSelector( DataSegment segment, - TierSelectorStrategy strategy + TierSelectorStrategy strategy, + HistoricalFilter filter ) { this.segment = new AtomicReference<>(DataSegmentInterner.intern(segment)); this.strategy = strategy; this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); + this.filter = filter; } public DataSegment getSegment() @@ -121,13 +126,16 @@ public boolean isEmpty() } } - public List getCandidates(final int numCandidates) + public List getCandidates( + final int numCandidates, + final CloneQueryMode cloneQueryMode + ) { List candidates; synchronized (this) { if (numCandidates > 0) { candidates = new ArrayList<>(numCandidates); - strategy.pick(historicalServers, segment.get(), numCandidates) + strategy.pick(filter.getQueryableServers(historicalServers, cloneQueryMode), segment.get(), numCandidates) .stream() .map(server -> server.getServer().getMetadata()) .forEach(candidates::add); @@ -140,21 +148,22 @@ public List getCandidates(final int numCandidates) } return candidates; } else { - return getAllServers(); + return getAllServers(cloneQueryMode); } } } - public List getAllServers() + public List getAllServers(CloneQueryMode cloneQueryMode) { final List servers = new ArrayList<>(); synchronized (this) { - historicalServers.values() - .stream() - .flatMap(Collection::stream) - .map(server -> server.getServer().getMetadata()) - .forEach(servers::add); + filter.getQueryableServers(historicalServers, cloneQueryMode) + .values() + .stream() + .flatMap(Collection::stream) + .map(server -> server.getServer().getMetadata()) + .forEach(servers::add); realtimeServers.values() .stream() @@ -167,11 +176,11 @@ public List getAllServers() } @Nullable - public QueryableDruidServer pick(@Nullable Query query) + public QueryableDruidServer pick(@Nullable Query query, CloneQueryMode cloneQueryMode) { synchronized (this) { if (!historicalServers.isEmpty()) { - return strategy.pick(query, historicalServers, segment.get()); + return strategy.pick(query, filter.getQueryableServers(historicalServers, cloneQueryMode), segment.get()); } return strategy.pick(query, realtimeServers, segment.get()); } diff --git a/server/src/main/java/org/apache/druid/server/BrokerDynamicConfigResource.java b/server/src/main/java/org/apache/druid/server/BrokerDynamicConfigResource.java new file mode 100644 index 000000000000..e29692fb993a --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/BrokerDynamicConfigResource.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.google.inject.Inject; +import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.client.BrokerViewOfCoordinatorConfig; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.http.security.ConfigResourceFilter; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +/** + * Resource for fetching and updating the {@link CoordinatorDynamicConfig} on the Broker. + */ +@Path("/druid-internal/v1/config") +public class BrokerDynamicConfigResource +{ + private final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig; + + @Inject + public BrokerDynamicConfigResource(BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig) + { + this.brokerViewOfCoordinatorConfig = brokerViewOfCoordinatorConfig; + } + + /** + * Returns the Broker's view of the {@link CoordinatorDynamicConfig}. + */ + @GET + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(ConfigResourceFilter.class) + @Path("/coordinator") + public Response getDynamicConfig() + { + return Response.ok(brokerViewOfCoordinatorConfig.getDynamicConfig()).build(); + } + + /** + * Updates the {@link BrokerViewOfCoordinatorConfig} with the dynamicConfig paramter. + */ + @POST + @Consumes(MediaType.APPLICATION_JSON) + @ResourceFilters(ConfigResourceFilter.class) + @Path("/coordinator") + public Response setDynamicConfig(final CoordinatorDynamicConfig dynamicConfig) + { + brokerViewOfCoordinatorConfig.setDynamicConfig(dynamicConfig); + return Response.ok().build(); + } +} diff --git a/server/src/main/java/org/apache/druid/server/BrokerQueryResource.java b/server/src/main/java/org/apache/druid/server/BrokerQueryResource.java index d1b034db89ba..9144d3f903dd 100644 --- a/server/src/main/java/org/apache/druid/server/BrokerQueryResource.java +++ b/server/src/main/java/org/apache/druid/server/BrokerQueryResource.java @@ -28,12 +28,15 @@ import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Self; import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.planning.ExecutionVertex; import org.apache.druid.server.http.security.StateResourceFilter; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthorizerMapper; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -89,10 +92,17 @@ public Response getQueryTargets( InputStream in, @QueryParam("pretty") String pretty, @QueryParam("numCandidates") @DefaultValue("-1") int numCandidates, + @QueryParam("cloneQueryMode") @Nullable String cloneQueryModeString, @Context final HttpServletRequest req ) throws IOException { final ResourceIOReaderWriter ioReaderWriter = createResourceIOReaderWriter(req, pretty != null); + final CloneQueryMode cloneQueryMode = QueryContexts.getAsEnum( + QueryContexts.CLONE_QUERY_MODE, + cloneQueryModeString, + CloneQueryMode.class, + QueryContexts.DEFAULT_CLONE_QUERY_MODE + ); try { Query query = ioReaderWriter.getRequestMapper().readValue(in, Query.class); ExecutionVertex ev = ExecutionVertex.of(query); @@ -101,7 +111,8 @@ public Response getQueryTargets( brokerServerView, ev.getBaseTableDataSource(), ev.getEffectiveQuerySegmentSpec().getIntervals(), - numCandidates + numCandidates, + cloneQueryMode ) ); } diff --git a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java index 212b61990b7d..9e38aa3ab8a6 100644 --- a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java +++ b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java @@ -34,7 +34,9 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.LocatedSegmentDescriptor; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.metadata.SegmentMetadataQueryConfig; import org.apache.druid.server.http.security.DatasourceResourceFilter; @@ -49,6 +51,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; @@ -300,15 +303,22 @@ public Iterable getQueryTargets( @PathParam("dataSourceName") String datasource, @QueryParam("intervals") String intervals, @QueryParam("numCandidates") @DefaultValue("-1") int numCandidates, + @QueryParam("cloneQueryMode") @Nullable String cloneQueryModeString, @Context final HttpServletRequest req ) { + final CloneQueryMode cloneQueryMode = QueryContexts.getAsEnum( + QueryContexts.CLONE_QUERY_MODE, + cloneQueryModeString, + CloneQueryMode.class, + QueryContexts.DEFAULT_CLONE_QUERY_MODE + ); List intervalList = new ArrayList<>(); for (String interval : intervals.split(",")) { intervalList.add(Intervals.of(interval.trim())); } List condensed = JodaUtils.condenseIntervals(intervalList); - return ServerViewUtil.getTargetLocations(timelineServerView, datasource, condensed, numCandidates); + return ServerViewUtil.getTargetLocations(timelineServerView, datasource, condensed, numCandidates, cloneQueryMode); } protected DateTime getCurrentTime() diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 1b76660ed0be..4d284f442c3e 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.inject.Inject; @@ -607,7 +608,8 @@ private QueryRunner decorateClusterRunner(Query query, QueryRunner * @param parentQueryResourceId Parent Query's Query Resource ID * @return DataSource populated with the subqueries */ - private DataSource generateSubqueryIds( + @VisibleForTesting + public static DataSource generateSubqueryIds( DataSource rootDataSource, @Nullable final String parentQueryId, @Nullable final String parentSqlQueryId, @@ -654,7 +656,7 @@ private DataSource generateSubqueryIds( * @param parentQueryResourceId Parent query's resource Id * @return Populates the subqueries from the map */ - private DataSource insertSubqueryIds( + private static DataSource insertSubqueryIds( DataSource currentDataSource, Map> queryDataSourceToSubqueryIds, @Nullable final String parentQueryId, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CloneStatusManager.java b/server/src/main/java/org/apache/druid/server/coordinator/CloneStatusManager.java new file mode 100644 index 000000000000..5983209fdf63 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/CloneStatusManager.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Manager to store and update the status of ongoing cloning operations. + */ +public class CloneStatusManager +{ + private final AtomicReference> cloneStatusSnapshot; + + public CloneStatusManager() + { + this.cloneStatusSnapshot = new AtomicReference<>(Map.of()); + } + + /** + * Returns the status of cloning as a list of {@link ServerCloneStatus}. + */ + public List getStatusForAllServers() + { + return List.copyOf(cloneStatusSnapshot.get().values()); + } + + /** + * Returns the status of cloning as a {@link ServerCloneStatus} for a specific target server. + */ + @Nullable + public ServerCloneStatus getStatusForServer(String targetServer) + { + return cloneStatusSnapshot.get().get(targetServer); + } + + /** + * Updates the stored status with the provided parameter. + */ + public void updateStatus(Map newStatusMap) + { + cloneStatusSnapshot.set(ImmutableMap.copyOf(newStatusMap)); + } +} + diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index 8a219484acf1..f34c3b8ae39e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -370,6 +370,7 @@ public String toString() ", pauseCoordination=" + pauseCoordination + ", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout + ", turboLoadingNodes=" + turboLoadingNodes + + ", cloneServers=" + cloneServers + '}'; } @@ -393,6 +394,7 @@ public boolean equals(Object o) && replicateAfterLoadTimeout == that.replicateAfterLoadTimeout && maxSegmentsInNodeLoadingQueue == that.maxSegmentsInNodeLoadingQueue && useRoundRobinSegmentAssignment == that.useRoundRobinSegmentAssignment + && smartSegmentLoading == that.smartSegmentLoading && pauseCoordination == that.pauseCoordination && Objects.equals( specificDataSourcesToKillUnusedSegmentsIn, @@ -404,7 +406,8 @@ public boolean equals(Object o) that.dataSourcesToNotKillStalePendingSegmentsIn) && Objects.equals(decommissioningNodes, that.decommissioningNodes) && Objects.equals(turboLoadingNodes, that.turboLoadingNodes) - && Objects.equals(debugDimensions, that.debugDimensions); + && Objects.equals(debugDimensions, that.debugDimensions) + && Objects.equals(cloneServers, that.cloneServers); } @Override @@ -417,6 +420,9 @@ public int hashCode() replicationThrottleLimit, balancerComputeThreads, maxSegmentsInNodeLoadingQueue, + useRoundRobinSegmentAssignment, + smartSegmentLoading, + replicateAfterLoadTimeout, specificDataSourcesToKillUnusedSegmentsIn, killTaskSlotRatio, maxKillTaskSlots, @@ -424,7 +430,8 @@ public int hashCode() decommissioningNodes, pauseCoordination, debugDimensions, - turboLoadingNodes + turboLoadingNodes, + cloneServers ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 8174d829df1c..0a6ac7414459 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -94,6 +94,7 @@ import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.server.http.CoordinatorDynamicConfigSyncer; import org.apache.druid.server.http.SegmentsToUpdateFilter; import org.apache.druid.server.lookup.cache.LookupCoordinatorManager; import org.apache.druid.timeline.DataSegment; @@ -142,6 +143,8 @@ public class DruidCoordinator @Nullable private final CoordinatorSegmentMetadataCache coordinatorSegmentMetadataCache; private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig; + private final CoordinatorDynamicConfigSyncer coordinatorDynamicConfigSyncer; + private final CloneStatusManager cloneStatusManager; private volatile boolean started = false; @@ -186,7 +189,9 @@ public DruidCoordinator( @Coordinator DruidLeaderSelector coordLeaderSelector, @Nullable CoordinatorSegmentMetadataCache coordinatorSegmentMetadataCache, CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig, - CompactionStatusTracker compactionStatusTracker + CompactionStatusTracker compactionStatusTracker, + CoordinatorDynamicConfigSyncer coordinatorDynamicConfigSyncer, + CloneStatusManager cloneStatusManager ) { this.config = config; @@ -208,6 +213,8 @@ public DruidCoordinator( this.loadQueueManager = loadQueueManager; this.coordinatorSegmentMetadataCache = coordinatorSegmentMetadataCache; this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig; + this.coordinatorDynamicConfigSyncer = coordinatorDynamicConfigSyncer; + this.cloneStatusManager = cloneStatusManager; this.compactSegments = initializeCompactSegmentsDuty(this.compactionStatusTracker); } @@ -443,6 +450,7 @@ private void becomeLeader() if (coordinatorSegmentMetadataCache != null) { coordinatorSegmentMetadataCache.onLeaderStart(); } + coordinatorDynamicConfigSyncer.onLeaderStart(); final int startingLeaderCounter = coordLeaderSelector.localTerm(); dutiesRunnables.add( @@ -524,6 +532,7 @@ private void stopBeingLeader() } compactionStatusTracker.stop(); taskMaster.onLeaderStop(); + coordinatorDynamicConfigSyncer.onLeaderStop(); serviceAnnouncer.unannounce(self); lookupCoordinatorManager.stop(); metadataManager.onLeaderStop(); @@ -559,7 +568,7 @@ private List makeHistoricalManagementDuties() new MarkOvershadowedSegmentsAsUnused(deleteSegments), new MarkEternityTombstonesAsUnused(deleteSegments), new BalanceSegments(config.getCoordinatorPeriod()), - new CloneHistoricals(loadQueueManager), + new CloneHistoricals(loadQueueManager, cloneStatusManager), new CollectLoadQueueStats() ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerCloneStatus.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerCloneStatus.java new file mode 100644 index 000000000000..0d011fd1243e --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerCloneStatus.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Immutable class which represents the current status of a single clone server. + */ +public class ServerCloneStatus +{ + private final String sourceServer; + private final String targetServer; + private final State state; + private final long segmentLoadsRemaining; + private final long segmentDropsRemaining; + private final long bytesToLoad; + + @JsonCreator + public ServerCloneStatus( + @JsonProperty("sourceServer") String sourceServer, + @JsonProperty("targetServer") String targetServer, + @JsonProperty("state") State state, + @JsonProperty("segmentLoadsRemaining") long segmentLoadsRemaining, + @JsonProperty("segmentDropsRemaining") long segmentDropsRemaining, + @JsonProperty("bytesToLoad") long bytesToLoad + ) + { + this.sourceServer = sourceServer; + this.targetServer = targetServer; + this.state = state; + this.segmentLoadsRemaining = segmentLoadsRemaining; + this.segmentDropsRemaining = segmentDropsRemaining; + this.bytesToLoad = bytesToLoad; + } + + @JsonProperty + public String getSourceServer() + { + return sourceServer; + } + + @JsonProperty + public String getTargetServer() + { + return targetServer; + } + + @JsonProperty + public long getSegmentLoadsRemaining() + { + return segmentLoadsRemaining; + } + + @JsonProperty + public long getSegmentDropsRemaining() + { + return segmentDropsRemaining; + } + + @JsonProperty + public long getBytesToLoad() + { + return bytesToLoad; + } + + @JsonProperty + public State getState() + { + return state; + } + + /** + * Create a {@link ServerCloneStatus} where the current status is unknown as the target server is missing. + */ + public static ServerCloneStatus unknown(String sourceServer, String targetServer) + { + return new ServerCloneStatus(sourceServer, targetServer, State.TARGET_SERVER_MISSING, -1, -1, -1); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ServerCloneStatus that = (ServerCloneStatus) o; + return segmentLoadsRemaining == that.segmentLoadsRemaining + && segmentDropsRemaining == that.segmentDropsRemaining + && bytesToLoad == that.bytesToLoad + && Objects.equals(sourceServer, that.sourceServer) + && Objects.equals(targetServer, that.targetServer) + && state == that.state; + } + + @Override + public int hashCode() + { + return Objects.hash( + sourceServer, + targetServer, + state, + segmentLoadsRemaining, + segmentDropsRemaining, + bytesToLoad + ); + } + + @Override + public String toString() + { + return "ServerCloneStatus{" + + "sourceServer='" + sourceServer + '\'' + + ", targetServer='" + targetServer + '\'' + + ", state=" + state + + ", segmentLoadsRemaining=" + segmentLoadsRemaining + + ", segmentDropsRemaining=" + segmentDropsRemaining + + ", bytesToLoad=" + bytesToLoad + + '}'; + } + + /** + * Enum determining the status of the cloning process. + */ + public enum State + { + /** + * The source server is missing from the current cluster view. The clone is continuing to load segments based on the + * last seen state of the source cluster. + */ + SOURCE_SERVER_MISSING, + /** + * The target server is missing from the current cluster view. + */ + TARGET_SERVER_MISSING, + /** + * Segments are loaded or being loaded. The counts give a better view of the progress. + */ + IN_PROGRESS + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java index 25534f1ddfbd..939686256eea 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CloneHistoricals.java @@ -20,8 +20,11 @@ package org.apache.druid.server.coordinator.duty; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.coordinator.CloneStatusManager; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.DruidCluster; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.ServerCloneStatus; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.server.coordinator.loading.SegmentAction; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; @@ -32,6 +35,7 @@ import org.apache.druid.timeline.DataSegment; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -45,10 +49,15 @@ public class CloneHistoricals implements CoordinatorDuty { private static final Logger log = new Logger(CloneHistoricals.class); private final SegmentLoadQueueManager loadQueueManager; + private final CloneStatusManager cloneStatusManager; - public CloneHistoricals(SegmentLoadQueueManager loadQueueManager) + public CloneHistoricals( + final SegmentLoadQueueManager loadQueueManager, + final CloneStatusManager cloneStatusManager + ) { this.loadQueueManager = loadQueueManager; + this.cloneStatusManager = cloneStatusManager; } @Override @@ -56,6 +65,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { final Map cloneServers = params.getCoordinatorDynamicConfig().getCloneServers(); final CoordinatorRunStats stats = params.getCoordinatorStats(); + final DruidCluster cluster = params.getDruidCluster(); if (cloneServers.isEmpty()) { // No servers to be cloned. @@ -63,22 +73,21 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) } // Create a map of host to historical. - final Map historicalMap = params.getDruidCluster() - .getHistoricals() - .values() - .stream() - .flatMap(Collection::stream) - .collect(Collectors.toMap( - serverHolder -> serverHolder.getServer().getHost(), - serverHolder -> serverHolder - )); + final Map hostToHistoricalMap = cluster.getHistoricals() + .values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toMap( + serverHolder -> serverHolder.getServer().getHost(), + serverHolder -> serverHolder + )); for (Map.Entry entry : cloneServers.entrySet()) { final String targetHistoricalName = entry.getKey(); - final ServerHolder targetServer = historicalMap.get(targetHistoricalName); + final ServerHolder targetServer = hostToHistoricalMap.get(targetHistoricalName); final String sourceHistoricalName = entry.getValue(); - final ServerHolder sourceServer = historicalMap.get(sourceHistoricalName); + final ServerHolder sourceServer = hostToHistoricalMap.get(sourceHistoricalName); if (sourceServer == null || targetServer == null) { log.error( @@ -114,6 +123,56 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) } } + final Map newStatusMap = createCurrentStatusMap(hostToHistoricalMap, cloneServers); + cloneStatusManager.updateStatus(newStatusMap); + return params; } + + /** + * Create a status map of cloning progress based on the cloneServers mapping and its current load queue. + */ + private Map createCurrentStatusMap( + Map historicalMap, + Map cloneServers + ) + { + final Map newStatusMap = new HashMap<>(); + + for (Map.Entry entry : cloneServers.entrySet()) { + final String targetServerName = entry.getKey(); + final ServerHolder targetServer = historicalMap.get(entry.getKey()); + final String sourceServerName = entry.getValue(); + + long segmentLoad = 0L; + long bytesLeft = 0L; + long segmentDrop = 0L; + + ServerCloneStatus newStatus; + if (targetServer == null) { + newStatus = ServerCloneStatus.unknown(sourceServerName, targetServerName); + } else { + + ServerCloneStatus.State state; + if (!historicalMap.containsKey(sourceServerName)) { + state = ServerCloneStatus.State.SOURCE_SERVER_MISSING; + } else { + state = ServerCloneStatus.State.IN_PROGRESS; + } + + for (Map.Entry queuedSegment : targetServer.getQueuedSegments().entrySet()) { + if (queuedSegment.getValue().isLoad()) { + segmentLoad += 1; + bytesLeft += queuedSegment.getKey().getSize(); + } else { + segmentDrop += 1; + } + } + newStatus = new ServerCloneStatus(sourceServerName, targetServerName, state, segmentLoad, segmentDrop, bytesLeft); + } + newStatusMap.put(targetServerName, newStatus); + } + + return newStatusMap; + } } diff --git a/server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java b/server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java new file mode 100644 index 000000000000..483be1db5bd4 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/http/BrokerSyncStatus.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.rpc.ServiceLocation; + +import java.util.Objects; + +/** + * Immutable class which represents the status of a dynamic configuration sync with a specific broker. + */ +public class BrokerSyncStatus +{ + private final String host; + private final int port; + private final long syncTimeInMs; + + @JsonCreator + public BrokerSyncStatus( + @JsonProperty("host") String host, + @JsonProperty("port") int port, + @JsonProperty("syncTimeInMs") long syncTimeInMs + ) + { + this.host = host; + this.port = port; + this.syncTimeInMs = syncTimeInMs; + } + + public BrokerSyncStatus(ServiceLocation broker, long syncTimeInMs) + { + this.host = broker.getHost(); + this.port = broker.getTlsPort() > 0 ? broker.getTlsPort() : broker.getPlaintextPort(); + this.syncTimeInMs = syncTimeInMs; + } + + @JsonProperty + public String getHost() + { + return host; + } + + @JsonProperty + public int getPort() + { + return port; + } + + @JsonProperty + public long getSyncTimeInMs() + { + return syncTimeInMs; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BrokerSyncStatus that = (BrokerSyncStatus) o; + return port == that.port && Objects.equals(host, that.host); + } + + @Override + public int hashCode() + { + return Objects.hash(host, port); + } + + @Override + public String toString() + { + return "BrokerSyncStatus{" + + "host='" + host + '\'' + + ", port=" + port + + ", syncTimeInMs=" + syncTimeInMs + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/CloneStatus.java b/server/src/main/java/org/apache/druid/server/http/CloneStatus.java new file mode 100644 index 000000000000..216d8cb0ec2f --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/http/CloneStatus.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.ServerCloneStatus; + +import java.util.List; +import java.util.Objects; + +/** + * Immutable class which the current set of Brokers which have been synced with the latest + * {@link CoordinatorDynamicConfig}. + */ +public class CloneStatus +{ + private final List cloneStatus; + + @JsonCreator + public CloneStatus(@JsonProperty("cloneStatus") List cloneStatus) + { + this.cloneStatus = cloneStatus; + } + + @JsonProperty + public List getCloneStatus() + { + return cloneStatus; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CloneStatus that = (CloneStatus) o; + return Objects.equals(cloneStatus, that.cloneStatus); + } + + @Override + public int hashCode() + { + return Objects.hashCode(cloneStatus); + } + + @Override + public String toString() + { + return "CloneStatus{" + + "cloneStatus=" + cloneStatus + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/ConfigSyncStatus.java b/server/src/main/java/org/apache/druid/server/http/ConfigSyncStatus.java new file mode 100644 index 000000000000..c54648c97c68 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/http/ConfigSyncStatus.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; + +import java.util.Objects; +import java.util.Set; + +/** + * Immutable class which contains the current set of Brokers which have been synced with the latest + * {@link CoordinatorDynamicConfig}. + */ +public class ConfigSyncStatus +{ + private final Set syncedBrokers; + + @JsonCreator + public ConfigSyncStatus(@JsonProperty("syncedBrokers") Set syncedBrokers) + { + this.syncedBrokers = syncedBrokers; + } + + @JsonProperty + public Set getSyncedBrokers() + { + return syncedBrokers; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ConfigSyncStatus that = (ConfigSyncStatus) o; + return Objects.equals(syncedBrokers, that.syncedBrokers); + } + + @Override + public int hashCode() + { + return Objects.hashCode(syncedBrokers); + } + + @Override + public String toString() + { + return "ConfigSyncStatus{" + + "syncedBrokers=" + syncedBrokers + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java new file mode 100644 index 000000000000..cdf4611a365d --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigSyncer.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.inject.Inject; +import org.apache.druid.client.broker.BrokerClient; +import org.apache.druid.client.broker.BrokerClientImpl; +import org.apache.druid.discovery.DiscoveryDruidNode; +import org.apache.druid.discovery.DruidNodeDiscoveryProvider; +import org.apache.druid.discovery.NodeRole; +import org.apache.druid.guice.annotations.EscalatedGlobal; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.rpc.FixedServiceLocator; +import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.rpc.ServiceLocation; +import org.apache.druid.rpc.StandardRetryPolicy; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.coordinator.CoordinatorConfigManager; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; + +import javax.annotation.Nullable; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Updates all brokers with the latest coordinator dynamic config. + */ +public class CoordinatorDynamicConfigSyncer +{ + private static final Logger log = new Logger(CoordinatorDynamicConfigSyncer.class); + + private final CoordinatorConfigManager configManager; + private final ObjectMapper jsonMapper; + private final DruidNodeDiscoveryProvider druidNodeDiscovery; + + private final ServiceClientFactory clientFactory; + private final ScheduledExecutorService exec; + private @Nullable Future syncFuture = null; + + @GuardedBy("this") + private final Set inSyncBrokers; + private final AtomicReference lastKnownConfig = new AtomicReference<>(); + + @Inject + public CoordinatorDynamicConfigSyncer( + @EscalatedGlobal final ServiceClientFactory clientFactory, + final CoordinatorConfigManager configManager, + @Json final ObjectMapper jsonMapper, + final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider + ) + { + this.clientFactory = clientFactory; + this.configManager = configManager; + this.jsonMapper = jsonMapper; + this.druidNodeDiscovery = druidNodeDiscoveryProvider; + this.exec = Execs.scheduledSingleThreaded("CoordinatorDynamicConfigSyncer-%d"); + this.inSyncBrokers = ConcurrentHashMap.newKeySet(); + } + + /** + * Queues the configuration sync to the brokers without blocking the calling thread. + */ + public void queueBroadcastConfigToBrokers() + { + exec.submit(this::broadcastConfigToBrokers); + } + + /** + * Push the latest coordinator dynamic config, provided by the configManager to all currently known Brokers. Also + * invalidates the set of inSyncBrokers if the config has changed. + */ + private void broadcastConfigToBrokers() + { + invalidateInSyncBrokersIfNeeded(); + for (ServiceLocation broker : getKnownBrokers()) { + pushConfigToBroker(broker); + } + } + + /** + * Returns the set of Brokers which have been updated with the latest {@link CoordinatorDynamicConfig}. + */ + public synchronized Set getInSyncBrokers() + { + return Set.copyOf(inSyncBrokers); + } + + /** + * Schedules a periodic sync with brokers when the coordinator becomes the leader. + */ + public void onLeaderStart() + { + log.info("Starting coordinator config syncing to brokers on leader node."); + syncFuture = exec.scheduleAtFixedRate( + this::broadcastConfigToBrokers, + 30L, + 60L, + TimeUnit.SECONDS + ); + } + + /** + * Stops the sync when coordinator stops being the leader. + */ + public void onLeaderStop() + { + log.info("Not leader, stopping coordinator config syncing to brokers."); + if (syncFuture != null) { + syncFuture.cancel(true); + } + } + + /** + * Push the latest coordinator dynamic config, provided by the configManager to the Broker at the brokerLocation + * param. + */ + private void pushConfigToBroker(ServiceLocation brokerLocation) + { + final BrokerClient brokerClient = new BrokerClientImpl( + clientFactory.makeClient( + NodeRole.BROKER.getJsonName(), + new FixedServiceLocator(brokerLocation), + StandardRetryPolicy.builder().maxAttempts(6).build() + ), + jsonMapper + ); + + try { + CoordinatorDynamicConfig currentDynamicConfig = configManager.getCurrentDynamicConfig(); + boolean success = brokerClient.updateCoordinatorDynamicConfig(currentDynamicConfig).get(); + if (success) { + markBrokerAsSynced(currentDynamicConfig, brokerLocation); + } + } + catch (Exception e) { + // Catch and ignore the exception, wait for the next sync. + log.error(e, "Exception while syncing dynamic configuration to broker[%s]", brokerLocation); + } + } + + /** + * Returns a list of {@link ServiceLocation} for all brokers currently known to the druidNodeDiscovery. + */ + private Set getKnownBrokers() + { + return druidNodeDiscovery.getForNodeRole(NodeRole.BROKER) + .getAllNodes() + .stream() + .map(CoordinatorDynamicConfigSyncer::convertDiscoveryNodeToServiceLocation) + .collect(Collectors.toSet()); + } + + /** + * Clears the set of inSyncBrokers and updates the lastKnownConfig if the latest coordinator dynamic config is + * different from the config tracked by this class. + */ + private synchronized void invalidateInSyncBrokersIfNeeded() + { + final CoordinatorDynamicConfig currentDynamicConfig = configManager.getCurrentDynamicConfig(); + if (!currentDynamicConfig.equals(lastKnownConfig.get())) { + // Config has changed, clear the inSync list. + inSyncBrokers.clear(); + lastKnownConfig.set(currentDynamicConfig); + } + } + + /** + * Adds a broker to the set of inSyncBrokers if the coordinator dynamic config has not changed. + */ + private synchronized void markBrokerAsSynced(CoordinatorDynamicConfig config, ServiceLocation broker) + { + if (config.equals(lastKnownConfig.get())) { + inSyncBrokers.add(new BrokerSyncStatus(broker, System.currentTimeMillis())); + } + } + + /** + * Utility method to convert {@link DiscoveryDruidNode} to a {@link ServiceLocation} + */ + @Nullable + private static ServiceLocation convertDiscoveryNodeToServiceLocation(DiscoveryDruidNode discoveryDruidNode) + { + final DruidNode druidNode = discoveryDruidNode.getDruidNode(); + if (druidNode == null) { + return null; + } + + return new ServiceLocation( + druidNode.getHost(), + druidNode.getPlaintextPort(), + druidNode.getTlsPort(), + "" + ); + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java index 1dcbdb7e5f23..93feb328a8c2 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResource.java @@ -23,12 +23,16 @@ import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.server.coordinator.CloneStatusManager; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.ServerCloneStatus; import org.apache.druid.server.http.security.ConfigResourceFilter; +import org.apache.druid.server.http.security.StateResourceFilter; import org.apache.druid.server.security.AuthorizationUtils; import org.joda.time.Interval; +import javax.annotation.Nullable; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -49,15 +53,21 @@ public class CoordinatorDynamicConfigsResource { private final CoordinatorConfigManager manager; private final AuditManager auditManager; + private final CoordinatorDynamicConfigSyncer coordinatorDynamicConfigSyncer; + private final CloneStatusManager cloneStatusManager; @Inject public CoordinatorDynamicConfigsResource( CoordinatorConfigManager manager, - AuditManager auditManager + AuditManager auditManager, + CoordinatorDynamicConfigSyncer coordinatorDynamicConfigSyncer, + CloneStatusManager cloneStatusManager ) { this.manager = manager; this.auditManager = auditManager; + this.coordinatorDynamicConfigSyncer = coordinatorDynamicConfigSyncer; + this.cloneStatusManager = cloneStatusManager; } @GET @@ -84,6 +94,7 @@ public Response setDynamicConfigs( ); if (setResult.isOk()) { + coordinatorDynamicConfigSyncer.queueBroadcastConfigToBrokers(); return Response.ok().build(); } else { return Response.status(Response.Status.BAD_REQUEST) @@ -132,4 +143,30 @@ public Response getDatasourceRuleHistory( ).build(); } + @GET + @Path("/syncedBrokers") + @ResourceFilters(StateResourceFilter.class) + @Produces(MediaType.APPLICATION_JSON) + public Response getBrokerStatus() + { + return Response.ok(new ConfigSyncStatus(coordinatorDynamicConfigSyncer.getInSyncBrokers())).build(); + } + + @GET + @Path("/cloneStatus") + @ResourceFilters(StateResourceFilter.class) + @Produces(MediaType.APPLICATION_JSON) + public Response getCloneStatus(@QueryParam("targetServer") @Nullable String targetServer) + { + if (targetServer != null) { + final ServerCloneStatus statusForServer = cloneStatusManager.getStatusForServer(targetServer); + if (statusForServer == null) { + return Response.status(Response.Status.NOT_FOUND).build(); + } + return Response.ok(statusForServer).build(); + } else { + final CloneStatus statusForAllServers = new CloneStatus(cloneStatusManager.getStatusForAllServers()); + return Response.ok(statusForAllServers).build(); + } + } } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 0afde5424945..707d61b140ba 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.TableDataSource; @@ -45,6 +46,7 @@ import org.apache.druid.segment.realtime.appenderator.SegmentSchemas; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestCoordinatorClient; import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; @@ -79,17 +81,20 @@ public class BrokerServerViewTest extends CuratorTestBase private BatchServerInventoryView baseView; private BrokerServerView brokerServerView; + private BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig; public BrokerServerViewTest() { jsonMapper = TestHelper.makeJsonMapper(); zkPathsConfig = new ZkPathsConfig(); + brokerViewOfCoordinatorConfig = new BrokerViewOfCoordinatorConfig(new TestCoordinatorClient()); } @Before public void setUp() throws Exception { setupServerAndCurator(); + brokerViewOfCoordinatorConfig.start(); curator.start(); curator.blockUntilConnected(); } @@ -128,7 +133,7 @@ public void testSingleServerAddedRemovedSegment() throws Exception ServerSelector selector = (actualPartitionHolder.iterator().next()).getObject(); Assert.assertFalse(selector.isEmpty()); Assert.assertEquals(segment, selector.getSegment()); - Assert.assertEquals(druidServer, selector.pick(null).getServer()); + Assert.assertEquals(druidServer, selector.pick(null, CloneQueryMode.EXCLUDECLONES).getServer()); Assert.assertNotNull(timeline.findChunk(intervals, "v1", partition)); unannounceSegmentForServer(druidServer, segment, zkPathsConfig); @@ -387,9 +392,9 @@ public void testMultipleTiers() throws Exception // Verify that the ServerSelector always picks Tier 1 for (int i = 0; i < 5; ++i) { - Assert.assertEquals(server21, selector.pick(null).getServer()); + Assert.assertEquals(server21, selector.pick(null, CloneQueryMode.EXCLUDECLONES).getServer()); } - Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2)); + Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2, CloneQueryMode.EXCLUDECLONES)); } @Test @@ -447,9 +452,9 @@ public void testRealtimeTasksNotWatched() throws Exception // Verify that the ServerSelector always picks the Historical server for (int i = 0; i < 5; ++i) { - Assert.assertEquals(historicalServer, selector.pick(null).getServer()); + Assert.assertEquals(historicalServer, selector.pick(null, CloneQueryMode.EXCLUDECLONES).getServer()); } - Assert.assertEquals(Collections.singletonList(historicalServer.getMetadata()), selector.getCandidates(2)); + Assert.assertEquals(Collections.singletonList(historicalServer.getMetadata()), selector.getCandidates(2, CloneQueryMode.EXCLUDECLONES)); } @Test @@ -509,9 +514,9 @@ public void testIgnoredTiers() throws Exception // Verify that the ServerSelector always picks Tier 1 for (int i = 0; i < 5; ++i) { - Assert.assertEquals(server21, selector.pick(null).getServer()); + Assert.assertEquals(server21, selector.pick(null, CloneQueryMode.EXCLUDECLONES).getServer()); } - Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2)); + Assert.assertEquals(Collections.singletonList(server21.getMetadata()), selector.getCandidates(2, CloneQueryMode.EXCLUDECLONES)); } @Test(expected = ISE.class) @@ -591,7 +596,7 @@ private void assertValues( ServerSelector selector = ((SingleElementPartitionChunk) actualPartitionHolder.iterator() .next()).getObject(); Assert.assertFalse(selector.isEmpty()); - Assert.assertEquals(expectedPair.rhs.rhs.lhs, selector.pick(null).getServer()); + Assert.assertEquals(expectedPair.rhs.rhs.lhs, selector.pick(null, CloneQueryMode.EXCLUDECLONES).getServer()); Assert.assertEquals(expectedPair.rhs.rhs.rhs, selector.getSegment()); } } @@ -685,7 +690,8 @@ public Set getIgnoredTiers() { return ignoredTiers; } - } + }, + brokerViewOfCoordinatorConfig ); baseView.start(); diff --git a/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java new file mode 100644 index 000000000000..5a014531bf54 --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.util.concurrent.Futures; +import org.apache.druid.client.coordinator.CoordinatorClient; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Map; + +public class BrokerViewOfCoordinatorConfigTest +{ + private BrokerViewOfCoordinatorConfig target; + + private CoordinatorClient coordinatorClient; + private CoordinatorDynamicConfig config; + + + @Before + public void setUp() throws Exception + { + config = CoordinatorDynamicConfig.builder() + .withCloneServers(Map.of("host1", "host2")) + .build(); + coordinatorClient = Mockito.mock(CoordinatorClient.class); + Mockito.when(coordinatorClient.getCoordinatorDynamicConfig()).thenReturn(Futures.immediateFuture(config)); + target = new BrokerViewOfCoordinatorConfig(coordinatorClient); + } + + @Test + public void testFetchesConfigOnStartup() + { + target.start(); + Mockito.verify(coordinatorClient, Mockito.times(1)).getCoordinatorDynamicConfig(); + Assert.assertEquals(config, target.getDynamicConfig()); + } +} diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java index 36e77dbaaec6..c27a0537ae2c 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java @@ -24,6 +24,7 @@ import com.google.common.primitives.Bytes; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.DataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryContext; @@ -86,7 +87,7 @@ public void testComputeEtag_nonHistorical() makeHistoricalServerSelector(0), makeRealtimeServerSelector(1) ); - String actual = keyManager.computeResultLevelCachingEtag(selectors, QUERY_CACHE_KEY); + String actual = keyManager.computeResultLevelCachingEtag(selectors, CloneQueryMode.EXCLUDECLONES, QUERY_CACHE_KEY); Assert.assertNull(actual); } @@ -99,14 +100,14 @@ public void testComputeEtag_DifferentHistoricals() makeHistoricalServerSelector(1), makeHistoricalServerSelector(1) ); - String actual1 = keyManager.computeResultLevelCachingEtag(selectors, QUERY_CACHE_KEY); + String actual1 = keyManager.computeResultLevelCachingEtag(selectors, CloneQueryMode.EXCLUDECLONES, QUERY_CACHE_KEY); Assert.assertNotNull(actual1); selectors = ImmutableSet.of( makeHistoricalServerSelector(1), makeHistoricalServerSelector(1) ); - String actual2 = keyManager.computeResultLevelCachingEtag(selectors, QUERY_CACHE_KEY); + String actual2 = keyManager.computeResultLevelCachingEtag(selectors, CloneQueryMode.EXCLUDECLONES, QUERY_CACHE_KEY); Assert.assertNotNull(actual2); Assert.assertEquals("cache key should not change for same server selectors", actual1, actual2); @@ -114,7 +115,7 @@ public void testComputeEtag_DifferentHistoricals() makeHistoricalServerSelector(2), makeHistoricalServerSelector(1) ); - String actual3 = keyManager.computeResultLevelCachingEtag(selectors, QUERY_CACHE_KEY); + String actual3 = keyManager.computeResultLevelCachingEtag(selectors, CloneQueryMode.EXCLUDECLONES, QUERY_CACHE_KEY); Assert.assertNotNull(actual3); Assert.assertNotEquals(actual1, actual3); } @@ -128,10 +129,10 @@ public void testComputeEtag_DifferentQueryCacheKey() makeHistoricalServerSelector(1), makeHistoricalServerSelector(1) ); - String actual1 = keyManager.computeResultLevelCachingEtag(selectors, new byte[]{1, 2}); + String actual1 = keyManager.computeResultLevelCachingEtag(selectors, CloneQueryMode.EXCLUDECLONES, new byte[]{1, 2}); Assert.assertNotNull(actual1); - String actual2 = keyManager.computeResultLevelCachingEtag(selectors, new byte[]{3, 4}); + String actual2 = keyManager.computeResultLevelCachingEtag(selectors, CloneQueryMode.EXCLUDECLONES, new byte[]{3, 4}); Assert.assertNotNull(actual2); Assert.assertNotEquals(actual1, actual2); } @@ -146,14 +147,14 @@ public void testComputeEtag_nonJoinDataSource() makeHistoricalServerSelector(1), makeHistoricalServerSelector(1) ); - String actual1 = keyManager.computeResultLevelCachingEtag(selectors, FULL_QUERY_CACHE_KEY); + String actual1 = keyManager.computeResultLevelCachingEtag(selectors, CloneQueryMode.EXCLUDECLONES, FULL_QUERY_CACHE_KEY); Assert.assertNotNull(actual1); selectors = ImmutableSet.of( makeHistoricalServerSelector(1), makeHistoricalServerSelector(1) ); - String actual2 = keyManager.computeResultLevelCachingEtag(selectors, null); + String actual2 = keyManager.computeResultLevelCachingEtag(selectors, CloneQueryMode.EXCLUDECLONES, null); Assert.assertNotNull(actual2); Assert.assertEquals(actual1, actual2); } @@ -169,7 +170,7 @@ public void testComputeEtag_joinWithUnsupportedCaching() makeHistoricalServerSelector(1), makeHistoricalServerSelector(1) ); - String actual = keyManager.computeResultLevelCachingEtag(selectors, null); + String actual = keyManager.computeResultLevelCachingEtag(selectors, CloneQueryMode.EXCLUDECLONES, null); Assert.assertNull(actual); } @@ -187,7 +188,7 @@ public void testComputeEtag_noEffectifBySegment() makeHistoricalServerSelector(1), makeHistoricalServerSelector(1) ); - String actual = keyManager.computeResultLevelCachingEtag(selectors, null); + String actual = keyManager.computeResultLevelCachingEtag(selectors, CloneQueryMode.EXCLUDECLONES, null); Assert.assertNotNull(actual); } @@ -206,7 +207,7 @@ public void testComputeEtag_noEffectIfUseAndPopulateFalse() makeHistoricalServerSelector(1), makeHistoricalServerSelector(1) ); - String actual = keyManager.computeResultLevelCachingEtag(selectors, null); + String actual = keyManager.computeResultLevelCachingEtag(selectors, CloneQueryMode.EXCLUDECLONES, null); Assert.assertNotNull(actual); } @@ -303,7 +304,7 @@ private SegmentServerSelector makeServerSelector(boolean isHistorical, int parti 0 ); expect(server.isSegmentReplicationTarget()).andReturn(isHistorical).anyTimes(); - expect(serverSelector.pick(query)).andReturn(queryableDruidServer).anyTimes(); + expect(serverSelector.pick(query, CloneQueryMode.EXCLUDECLONES)).andReturn(queryableDruidServer).anyTimes(); expect(queryableDruidServer.getServer()).andReturn(server).anyTimes(); expect(serverSelector.getSegment()).andReturn(segment).anyTimes(); replay(serverSelector, queryableDruidServer, server); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 27fb1b49cc58..20972a6127a9 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -30,6 +30,7 @@ import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.ForegroundCachePopulator; import org.apache.druid.client.cache.MapCache; +import org.apache.druid.client.selector.HistoricalFilter; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.client.selector.TierSelectorStrategy; import org.apache.druid.guice.http.DruidHttpClientConfig; @@ -212,7 +213,8 @@ public List pick( ) ); } - } + }, + HistoricalFilter.IDENTITY_FILTER ) )); } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java index d4611710e509..c944146cb5a9 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientPerfTest.java @@ -53,6 +53,7 @@ import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.coordination.ServerManagerTest; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestCoordinatorClient; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.VersionedIntervalTimeline; @@ -86,6 +87,8 @@ public void testGetQueryRunnerForSegments_singleIntervalLargeSegments() final List segmentDescriptors = new ArrayList<>(segmentCount); final List dataSegments = new ArrayList<>(segmentCount); final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + final BrokerViewOfCoordinatorConfig brokerViewOfCoordinatorConfig = new BrokerViewOfCoordinatorConfig(new TestCoordinatorClient()); + brokerViewOfCoordinatorConfig.start(); final DruidServer server = new DruidServer( "server", "localhost:9000", @@ -105,7 +108,8 @@ public void testGetQueryRunnerForSegments_singleIntervalLargeSegments() Iterators.transform(dataSegments.iterator(), segment -> { ServerSelector ss = new ServerSelector( segment, - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + brokerViewOfCoordinatorConfig ); ss.addServerAndUpdateSegment(new QueryableDruidServer( server, diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 8ea6e2e85cde..d42b44b2db6c 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -44,6 +44,7 @@ import org.apache.druid.client.cache.ForegroundCachePopulator; import org.apache.druid.client.cache.MapCache; import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; +import org.apache.druid.client.selector.HistoricalFilter; import org.apache.druid.client.selector.RandomServerSelectorStrategy; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.guice.http.DruidHttpClientConfig; @@ -144,7 +145,6 @@ import org.junit.runners.Parameterized; import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -549,7 +549,8 @@ public void testCachingOverBulkLimitEnforcesLimit() EasyMock.replay(dataSegment); final ServerSelector selector = new ServerSelector( dataSegment, - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + HistoricalFilter.IDENTITY_FILTER ); selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), dataSegment); timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector)); @@ -1752,7 +1753,8 @@ private ServerSelector makeMockHashBasedSelector( ServerSelector selector = new ServerSelector( segment, - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + HistoricalFilter.IDENTITY_FILTER ); selector.addServerAndUpdateSegment(new QueryableDruidServer(server, null), segment); return selector; @@ -1785,7 +1787,8 @@ private ServerSelector makeMockSingleDimensionSelector( ServerSelector selector = new ServerSelector( segment, - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + HistoricalFilter.IDENTITY_FILTER ); selector.addServerAndUpdateSegment(new QueryableDruidServer(server, null), segment); return selector; @@ -2232,7 +2235,8 @@ private List> populateTimeline( EasyMock.replay(mockSegment); ServerSelector selector = new ServerSelector( expectation.getSegment(), - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + HistoricalFilter.IDENTITY_FILTER ); selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), selector.getSegment()); EasyMock.reset(mockSegment); @@ -3037,7 +3041,8 @@ public void testIfNoneMatch() ); final ServerSelector selector = new ServerSelector( dataSegment, - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + HistoricalFilter.IDENTITY_FILTER ); selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0], null), dataSegment); timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector)); @@ -3078,7 +3083,8 @@ public void testEtagforDifferentQueryInterval() ); final ServerSelector selector = new ServerSelector( dataSegment, - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + HistoricalFilter.IDENTITY_FILTER ); selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0], null), dataSegment); timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector)); diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java index 1bdcec158268..a86a07c8f866 100644 --- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.Druids; import org.apache.druid.query.Query; import org.apache.druid.query.QueryInterruptedException; @@ -48,6 +49,7 @@ import org.apache.druid.query.timeboundary.TimeBoundaryQuery; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestCoordinatorClient; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -103,10 +105,13 @@ public class DirectDruidClientTest @Before public void setup() { + final BrokerViewOfCoordinatorConfig filter = new BrokerViewOfCoordinatorConfig(new TestCoordinatorClient()); + filter.start(); httpClient = EasyMock.createMock(HttpClient.class); serverSelector = new ServerSelector( dataSegment, - new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()), + filter ); queryCancellationExecutor = Execs.scheduledSingleThreaded("query-cancellation-executor"); client = new DirectDruidClient( @@ -243,7 +248,7 @@ public void testRun() throws Exception Assert.assertEquals(2, client2.getNumOpenConnections()); - Assert.assertEquals(serverSelector.pick(null), queryableDruidServer2); + Assert.assertEquals(serverSelector.pick(null, CloneQueryMode.EXCLUDECLONES), queryableDruidServer2); EasyMock.verify(httpClient); } diff --git a/server/src/test/java/org/apache/druid/client/SimpleServerView.java b/server/src/test/java/org/apache/druid/client/SimpleServerView.java index 523bba766ba3..f7c93f2c9e27 100644 --- a/server/src/test/java/org/apache/druid/client/SimpleServerView.java +++ b/server/src/test/java/org/apache/druid/client/SimpleServerView.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Ordering; import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; +import org.apache.druid.client.selector.HistoricalFilter; import org.apache.druid.client.selector.RandomServerSelectorStrategy; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.client.selector.TierSelectorStrategy; @@ -113,7 +114,7 @@ private void addSegmentToServer(DruidServer server, DataSegment segment) { final ServerSelector selector = selectors.computeIfAbsent( segment.getId().toString(), - k -> new ServerSelector(segment, tierSelectorStrategy) + k -> new ServerSelector(segment, tierSelectorStrategy, HistoricalFilter.IDENTITY_FILTER) ); selector.addServerAndUpdateSegment(servers.get(server), segment); // broker needs to skip tombstones in its timelines diff --git a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java index 1cf7f86f352c..67180d9e1b2e 100644 --- a/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java +++ b/server/src/test/java/org/apache/druid/client/coordinator/CoordinatorClientImplTest.java @@ -44,6 +44,7 @@ import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.PruneLoadSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -423,4 +424,30 @@ public void test_getCompactionSnapshots_nonNullDataSource() throws Exception coordinatorClient.getCompactionSnapshots("ds1").get() ); } + + @Test + public void test_getCoordinatorDynamicConfig() throws Exception + { + CoordinatorDynamicConfig config = CoordinatorDynamicConfig + .builder() + .withMaxSegmentsToMove(105) + .withReplicantLifetime(500) + .withReplicationThrottleLimit(5) + .build(); + + serviceClient.expectAndRespond( + new RequestBuilder( + HttpMethod.GET, + "/druid/coordinator/v1/config" + ), + HttpResponseStatus.OK, + ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), + DefaultObjectMapper.INSTANCE.writeValueAsBytes(config) + ); + + Assert.assertEquals( + config, + coordinatorClient.getCoordinatorDynamicConfig().get() + ); + } } diff --git a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java index ac01db424afa..f65be567ff04 100644 --- a/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java +++ b/server/src/test/java/org/apache/druid/client/coordinator/NoopCoordinatorClient.java @@ -26,6 +26,7 @@ import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.segment.metadata.DataSourceInformation; import org.apache.druid.server.compaction.CompactionStatusResponse; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -90,4 +91,10 @@ public ListenableFuture getCompactionSnapshots(@Nullab throw new UnsupportedOperationException(); } + @Override + public ListenableFuture getCoordinatorDynamicConfig() + { + throw new UnsupportedOperationException(); + } + } diff --git a/server/src/test/java/org/apache/druid/client/selector/ConnectionCountServerSelectorStrategyTest.java b/server/src/test/java/org/apache/druid/client/selector/ConnectionCountServerSelectorStrategyTest.java index d914e9e59433..950d1f80556c 100644 --- a/server/src/test/java/org/apache/druid/client/selector/ConnectionCountServerSelectorStrategyTest.java +++ b/server/src/test/java/org/apache/druid/client/selector/ConnectionCountServerSelectorStrategyTest.java @@ -24,6 +24,7 @@ import org.apache.druid.client.QueryableDruidServer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.CloneQueryMode; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; @@ -50,7 +51,7 @@ public void testDifferentConnectionCount() ServerSelector serverSelector = initSelector(s1, s2, s3); for (int i = 0; i < 100; ++i) { - Assert.assertEquals(s2, serverSelector.pick(null)); + Assert.assertEquals(s2, serverSelector.pick(null, CloneQueryMode.EXCLUDECLONES)); } } @@ -63,7 +64,7 @@ public void testBalancerTieBreaking() Set pickedServers = new HashSet<>(); for (int i = 0; i < 100; ++i) { - pickedServers.add(serverSelector.pick(null).getServer().getName()); + pickedServers.add(serverSelector.pick(null, CloneQueryMode.EXCLUDECLONES).getServer().getName()); } Assert.assertTrue( "Multiple servers should be selected when the number of connections is equal.", @@ -103,7 +104,8 @@ private ServerSelector initSelector(QueryableDruidServer... servers) new NumberedShardSpec(0, 0), 0, 0L - ), strategy + ), strategy, + HistoricalFilter.IDENTITY_FILTER ); List serverList = new ArrayList<>(Arrays.asList(servers)); Collections.shuffle(serverList); diff --git a/server/src/test/java/org/apache/druid/client/selector/ServerSelectorTest.java b/server/src/test/java/org/apache/druid/client/selector/ServerSelectorTest.java index e652ca2c68c2..5b351b092f48 100644 --- a/server/src/test/java/org/apache/druid/client/selector/ServerSelectorTest.java +++ b/server/src/test/java/org/apache/druid/client/selector/ServerSelectorTest.java @@ -65,7 +65,8 @@ public void testSegmentUpdate() .binaryVersion(9) .size(0) .build(), - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + HistoricalFilter.IDENTITY_FILTER ); selector.addServerAndUpdateSegment( @@ -108,7 +109,8 @@ public void testSegmentCannotBeNull() { final ServerSelector selector = new ServerSelector( null, - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + HistoricalFilter.IDENTITY_FILTER ); } @@ -132,7 +134,8 @@ public void testSegmentWithNoData() .binaryVersion(9) .size(0) .build(), - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + HistoricalFilter.IDENTITY_FILTER ); Assert.assertFalse(selector.hasData()); } @@ -159,7 +162,8 @@ public void testSegmentWithData() .binaryVersion(9) .size(0) .build(), - new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), + HistoricalFilter.IDENTITY_FILTER ); Assert.assertTrue(selector.hasData()); } diff --git a/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java b/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java index e6afb1cba9cf..335b0ef7c502 100644 --- a/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java +++ b/server/src/test/java/org/apache/druid/client/selector/TierSelectorStrategyTest.java @@ -24,6 +24,7 @@ import org.apache.druid.client.QueryableDruidServer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.CloneQueryMode; import org.apache.druid.query.Query; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; @@ -226,7 +227,8 @@ private void testTierSelectorStrategy( 0, 0L ), - tierSelectorStrategy + tierSelectorStrategy, + HistoricalFilter.IDENTITY_FILTER ); List servers = new ArrayList<>(Arrays.asList(expectedSelection)); @@ -240,10 +242,10 @@ private void testTierSelectorStrategy( serverSelector.addServerAndUpdateSegment(server, serverSelector.getSegment()); } - Assert.assertEquals(expectedSelection[0], serverSelector.pick(null)); - Assert.assertEquals(expectedSelection[0], serverSelector.pick(EasyMock.createMock(Query.class))); - Assert.assertEquals(expectedCandidates, serverSelector.getCandidates(-1)); - Assert.assertEquals(expectedCandidates.subList(0, 2), serverSelector.getCandidates(2)); + Assert.assertEquals(expectedSelection[0], serverSelector.pick(null, CloneQueryMode.EXCLUDECLONES)); + Assert.assertEquals(expectedSelection[0], serverSelector.pick(EasyMock.createMock(Query.class), CloneQueryMode.EXCLUDECLONES)); + Assert.assertEquals(expectedCandidates, serverSelector.getCandidates(-1, CloneQueryMode.EXCLUDECLONES)); + Assert.assertEquals(expectedCandidates.subList(0, 2), serverSelector.getCandidates(2, CloneQueryMode.EXCLUDECLONES)); } @Test diff --git a/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java b/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java index d7f2d98e8c44..8e9f0c54c12c 100644 --- a/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java @@ -27,6 +27,7 @@ import org.apache.druid.client.FilteredServerInventoryView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; +import org.apache.druid.client.selector.HistoricalFilter; import org.apache.druid.client.selector.RandomServerSelectorStrategy; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.java.util.common.Intervals; @@ -368,7 +369,7 @@ private void addSegment( .size(1) .build(); server.addDataSegment(segment); - ServerSelector ss = new ServerSelector(segment, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())); + ServerSelector ss = new ServerSelector(segment, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), HistoricalFilter.IDENTITY_FILTER); timeline.add(Intervals.of(interval), version, new SingleElementPartitionChunk<>(ss)); } @@ -392,7 +393,7 @@ private void addSegmentWithShardSpec( .size(1) .build(); server.addDataSegment(segment); - ServerSelector ss = new ServerSelector(segment, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())); + ServerSelector ss = new ServerSelector(segment, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), HistoricalFilter.IDENTITY_FILTER); timeline.add(Intervals.of(interval), version, shardSpec.createChunk(ss)); } diff --git a/server/src/test/java/org/apache/druid/server/coordination/TestCoordinatorClient.java b/server/src/test/java/org/apache/druid/server/coordination/TestCoordinatorClient.java index 9f297ddd39eb..d9dc42aadae5 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/TestCoordinatorClient.java +++ b/server/src/test/java/org/apache/druid/server/coordination/TestCoordinatorClient.java @@ -24,23 +24,31 @@ import org.apache.druid.client.BootstrapSegmentsResponse; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.timeline.DataSegment; import java.util.HashSet; import java.util.Set; -class TestCoordinatorClient extends NoopCoordinatorClient +public class TestCoordinatorClient extends NoopCoordinatorClient { private final Set bootstrapSegments; + private final CoordinatorDynamicConfig config; - TestCoordinatorClient() + public TestCoordinatorClient() { - this(new HashSet<>()); + this(new HashSet<>(), CoordinatorDynamicConfig.builder().build()); } - TestCoordinatorClient(final Set bootstrapSegments) + public TestCoordinatorClient(final Set bootstrapSegments) + { + this(bootstrapSegments, CoordinatorDynamicConfig.builder().build()); + } + + public TestCoordinatorClient(final Set bootstrapSegments, final CoordinatorDynamicConfig config) { this.bootstrapSegments = bootstrapSegments; + this.config = config; } @Override @@ -50,4 +58,10 @@ public ListenableFuture fetchBootstrapSegments() new BootstrapSegmentsResponse(CloseableIterators.withEmptyBaggage(bootstrapSegments.iterator())) ); } + + @Override + public ListenableFuture getCoordinatorDynamicConfig() + { + return Futures.immediateFuture(config); + } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index fa6a4e9bdc56..182fce84cb0c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -71,6 +71,7 @@ import org.apache.druid.server.coordinator.rules.IntervalLoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.server.http.CoordinatorDynamicConfigSyncer; import org.apache.druid.server.lookup.cache.LookupCoordinatorManager; import org.apache.druid.timeline.DataSegment; import org.easymock.EasyMock; @@ -167,7 +168,9 @@ public void setUp() throws Exception new TestDruidLeaderSelector(), null, CentralizedDatasourceSchemaConfig.create(), - new CompactionStatusTracker(OBJECT_MAPPER) + new CompactionStatusTracker(OBJECT_MAPPER), + EasyMock.niceMock(CoordinatorDynamicConfigSyncer.class), + EasyMock.niceMock(CloneStatusManager.class) ); } @@ -476,7 +479,9 @@ public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty() new TestDruidLeaderSelector(), null, CentralizedDatasourceSchemaConfig.create(), - new CompactionStatusTracker(OBJECT_MAPPER) + new CompactionStatusTracker(OBJECT_MAPPER), + EasyMock.niceMock(CoordinatorDynamicConfigSyncer.class), + EasyMock.niceMock(CloneStatusManager.class) ); coordinator.start(); @@ -526,7 +531,9 @@ public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupDoesNotContainsC new TestDruidLeaderSelector(), null, CentralizedDatasourceSchemaConfig.create(), - new CompactionStatusTracker(OBJECT_MAPPER) + new CompactionStatusTracker(OBJECT_MAPPER), + EasyMock.niceMock(CoordinatorDynamicConfigSyncer.class), + EasyMock.niceMock(CloneStatusManager.class) ); coordinator.start(); // Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties @@ -576,7 +583,9 @@ public void testInitializeCompactSegmentsDutyWhenCustomDutyGroupContainsCompactS new TestDruidLeaderSelector(), null, CentralizedDatasourceSchemaConfig.create(), - new CompactionStatusTracker(OBJECT_MAPPER) + new CompactionStatusTracker(OBJECT_MAPPER), + EasyMock.niceMock(CoordinatorDynamicConfigSyncer.class), + EasyMock.niceMock(CloneStatusManager.class) ); coordinator.start(); @@ -688,7 +697,9 @@ public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception new TestDruidLeaderSelector(), null, CentralizedDatasourceSchemaConfig.create(), - new CompactionStatusTracker(OBJECT_MAPPER) + new CompactionStatusTracker(OBJECT_MAPPER), + EasyMock.niceMock(CoordinatorDynamicConfigSyncer.class), + EasyMock.niceMock(CloneStatusManager.class) ); coordinator.start(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ServerCloneStatusTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ServerCloneStatusTest.java new file mode 100644 index 000000000000..f5870cdca71f --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/ServerCloneStatusTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class ServerCloneStatusTest +{ + @Test + public void testSerde() throws Exception + { + ServerCloneStatus metrics = new ServerCloneStatus("host2", "host1", ServerCloneStatus.State.IN_PROGRESS, 3012, 10, 100); + byte[] bytes = DefaultObjectMapper.INSTANCE.writeValueAsBytes(metrics); + ServerCloneStatus deserialized = DefaultObjectMapper.INSTANCE.readValue(bytes, ServerCloneStatus.class); + Assert.assertEquals(deserialized, metrics); + } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(ServerCloneStatus.class) + .withNonnullFields("sourceServer", "state") + .usingGetClass() + .verify(); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index ffceedb21f94..70237158a007 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -43,6 +43,7 @@ import org.apache.druid.rpc.indexing.SegmentUpdateResponse; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.server.compaction.CompactionStatusTracker; +import org.apache.druid.server.coordinator.CloneStatusManager; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.DruidCompactionConfig; @@ -63,6 +64,7 @@ import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster; import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager; import org.apache.druid.server.coordinator.rules.Rule; +import org.apache.druid.server.http.CoordinatorDynamicConfigSyncer; import org.apache.druid.server.http.SegmentsToUpdateFilter; import org.apache.druid.server.lookup.cache.LookupCoordinatorManager; import org.apache.druid.timeline.DataSegment; @@ -220,7 +222,9 @@ public CoordinatorSimulation build() env.leaderSelector, null, CentralizedDatasourceSchemaConfig.create(), - new CompactionStatusTracker(OBJECT_MAPPER) + new CompactionStatusTracker(OBJECT_MAPPER), + env.configSyncer, + env.cloneStatusManager ); return new SimulationImpl(coordinator, env); @@ -420,6 +424,8 @@ private static class Environment private final MetadataManager metadataManager; private final LookupCoordinatorManager lookupCoordinatorManager; private final DruidCoordinatorConfig coordinatorConfig; + private final CoordinatorDynamicConfigSyncer configSyncer; + private final CloneStatusManager cloneStatusManager; private final boolean loadImmediately; private final boolean autoSyncInventory; @@ -483,6 +489,12 @@ private Environment( null, null ); + + this.configSyncer = EasyMock.niceMock(CoordinatorDynamicConfigSyncer.class); + this.cloneStatusManager = EasyMock.niceMock(CloneStatusManager.class); + + mocks.add(configSyncer); + mocks.add(cloneStatusManager); } private void setUp() throws Exception diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index 98324792ef7b..22f3ccbdd4db 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -20,7 +20,9 @@ package org.apache.druid.server.http; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.utils.JvmUtils; @@ -28,6 +30,7 @@ import org.junit.Test; import javax.annotation.Nullable; +import java.util.Map; import java.util.Set; public class CoordinatorDynamicConfigTest @@ -52,7 +55,8 @@ public void testSerde() throws Exception + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n" + " \"pauseCoordination\": false,\n" + " \"replicateAfterLoadTimeout\": false,\n" - + " \"turboLoadingNodes\":[\"host1\", \"host3\"]\n" + + " \"turboLoadingNodes\":[\"host1\", \"host3\"],\n" + + " \"cloneServers\":{\"host5\": \"host6\"}\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -67,6 +71,7 @@ public void testSerde() throws Exception ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); ImmutableSet turboLoadingNodes = ImmutableSet.of("host1", "host3"); + ImmutableMap cloneServers = ImmutableMap.of("host5", "host6"); assertConfig( actual, 1, @@ -81,7 +86,8 @@ public void testSerde() throws Exception decommissioning, false, false, - turboLoadingNodes + turboLoadingNodes, + cloneServers ); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); @@ -99,7 +105,8 @@ public void testSerde() throws Exception ImmutableSet.of("host1"), false, false, - turboLoadingNodes + turboLoadingNodes, + cloneServers ); actual = CoordinatorDynamicConfig.builder().build(actual); @@ -117,7 +124,8 @@ public void testSerde() throws Exception ImmutableSet.of("host1"), false, false, - turboLoadingNodes + turboLoadingNodes, + cloneServers ); actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); @@ -135,7 +143,8 @@ public void testSerde() throws Exception ImmutableSet.of("host1"), true, false, - turboLoadingNodes + turboLoadingNodes, + cloneServers ); actual = CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual); @@ -153,7 +162,8 @@ public void testSerde() throws Exception ImmutableSet.of("host1"), true, true, - turboLoadingNodes + turboLoadingNodes, + cloneServers ); actual = CoordinatorDynamicConfig.builder().build(actual); @@ -171,7 +181,8 @@ public void testSerde() throws Exception ImmutableSet.of("host1"), true, true, - turboLoadingNodes + turboLoadingNodes, + cloneServers ); actual = CoordinatorDynamicConfig.builder().withKillTaskSlotRatio(0.1).build(actual); @@ -189,7 +200,8 @@ public void testSerde() throws Exception ImmutableSet.of("host1"), true, true, - turboLoadingNodes + turboLoadingNodes, + cloneServers ); actual = CoordinatorDynamicConfig.builder().withMaxKillTaskSlots(5).build(actual); @@ -207,7 +219,29 @@ public void testSerde() throws Exception ImmutableSet.of("host1"), true, true, - turboLoadingNodes + turboLoadingNodes, + cloneServers + ); + + actual = CoordinatorDynamicConfig.builder() + .withTurboLoadingNodes(ImmutableSet.of("host3")) + .withCloneServers(ImmutableMap.of("host3", "host4")).build(actual); + assertConfig( + actual, + 1, + 1, + 1, + 1, + 2, + whitelist, + 0.1, + 5, + 1, + ImmutableSet.of("host1"), + true, + true, + ImmutableSet.of("host3"), + ImmutableMap.of("host3", "host4") ); } @@ -287,7 +321,8 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio + " \"balancerComputeThreads\": 2, \n" + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" + " \"maxSegmentsInNodeLoadingQueue\": 1,\n" - + " \"turboLoadingNodes\": [\"host3\",\"host4\"]\n" + + " \"turboLoadingNodes\": [\"host3\",\"host4\"],\n" + + " \"cloneServers\": {\"host3\":\"host4\", \"host5\":\"host6\"}\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -302,6 +337,7 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); ImmutableSet turboLoading = ImmutableSet.of("host3", "host4"); + ImmutableMap cloneServers = ImmutableMap.of("host3", "host4", "host5", "host6"); assertConfig( actual, 1, @@ -316,7 +352,8 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio decommissioning, false, false, - turboLoading + turboLoading, + cloneServers ); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); @@ -334,7 +371,8 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio ImmutableSet.of("host1"), false, false, - turboLoading + turboLoading, + cloneServers ); actual = CoordinatorDynamicConfig.builder().build(actual); @@ -352,7 +390,8 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio ImmutableSet.of("host1"), false, false, - turboLoading + turboLoading, + cloneServers ); } @@ -392,7 +431,8 @@ public void testSerdeWithStringInKillDataSourceWhitelist() throws Exception ImmutableSet.of(), false, false, - ImmutableSet.of() + ImmutableSet.of(), + ImmutableMap.of() ); } @@ -432,7 +472,8 @@ public void testHandleMissingPercentOfSegmentsToConsiderPerMove() throws Excepti decommissioning, false, false, - ImmutableSet.of() + ImmutableSet.of(), + ImmutableMap.of() ); } @@ -468,7 +509,8 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti ImmutableSet.of(), false, false, - ImmutableSet.of() + ImmutableSet.of(), + ImmutableMap.of() ); } @@ -491,7 +533,8 @@ public void testBuilderDefaults() emptyList, false, false, - ImmutableSet.of() + ImmutableSet.of(), + ImmutableMap.of() ); } @@ -517,7 +560,8 @@ public void testBuilderWithDefaultSpecificDataSourcesToKillUnusedSegmentsInSpeci ImmutableSet.of(), false, false, - ImmutableSet.of() + ImmutableSet.of(), + ImmutableMap.of() ); } @@ -570,7 +614,8 @@ private void assertConfig( Set decommissioningNodes, boolean pauseCoordination, boolean replicateAfterLoadTimeout, - Set turboLoadingNodes + Set turboLoadingNodes, + Map cloneServers ) { Assert.assertEquals( @@ -592,10 +637,20 @@ private void assertConfig( Assert.assertEquals(pauseCoordination, config.getPauseCoordination()); Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout()); Assert.assertEquals(turboLoadingNodes, config.getTurboLoadingNodes()); + Assert.assertEquals(cloneServers, config.getCloneServers()); } private static int getDefaultNumBalancerThreads() { return Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() / 2); } + + @Test + public void testEquals() + { + EqualsVerifier.forClass(CoordinatorDynamicConfig.class) + .withIgnoredFields("validDebugDimensions") + .usingGetClass() + .verify(); + } } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResourceTest.java new file mode 100644 index 000000000000..87d99b0759bd --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigsResourceTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.audit.AuditManager; +import org.apache.druid.server.coordinator.CloneStatusManager; +import org.apache.druid.server.coordinator.CoordinatorConfigManager; +import org.apache.druid.server.coordinator.ServerCloneStatus; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.ws.rs.core.Response; +import java.util.List; + +public class CoordinatorDynamicConfigsResourceTest +{ + private CoordinatorConfigManager manager; + private AuditManager auditManager; + private CoordinatorDynamicConfigSyncer coordinatorDynamicConfigSyncer; + private CloneStatusManager cloneStatusManager; + + @Before + public void setUp() throws Exception + { + manager = EasyMock.createStrictMock(CoordinatorConfigManager.class); + auditManager = EasyMock.createStrictMock(AuditManager.class); + coordinatorDynamicConfigSyncer = EasyMock.createStrictMock(CoordinatorDynamicConfigSyncer.class); + cloneStatusManager = EasyMock.createStrictMock(CloneStatusManager.class); + } + + @Test + public void testGetBrokerStatus() + { + EasyMock.expect(coordinatorDynamicConfigSyncer.getInSyncBrokers()) + .andReturn( + ImmutableSet.of( + new BrokerSyncStatus("host1", 8080, 1000 + ) + ) + ) + .once(); + EasyMock.replay(coordinatorDynamicConfigSyncer); + EasyMock.replay(cloneStatusManager); + + final Response response = new CoordinatorDynamicConfigsResource( + manager, + auditManager, + coordinatorDynamicConfigSyncer, + cloneStatusManager + ).getBrokerStatus(); + + Assert.assertEquals(200, response.getStatus()); + + ConfigSyncStatus expected = new ConfigSyncStatus( + ImmutableSet.of( + new BrokerSyncStatus("host1", 8080, 1000) + ) + ); + Assert.assertEquals(expected, response.getEntity()); + } + + @Test + public void testGetCloneStatus() + { + List statusMetrics = ImmutableList.of( + new ServerCloneStatus("hist3", "hist1", ServerCloneStatus.State.IN_PROGRESS, 2, 0, 1000), + ServerCloneStatus.unknown("hist4", "hist3") + ); + + EasyMock.expect(cloneStatusManager.getStatusForAllServers()).andReturn(statusMetrics).once(); + EasyMock.expect(cloneStatusManager.getStatusForServer("hist2")).andReturn(ServerCloneStatus.unknown("hist4", "hist3")).once(); + EasyMock.replay(coordinatorDynamicConfigSyncer); + EasyMock.replay(cloneStatusManager); + + CoordinatorDynamicConfigsResource resource = new CoordinatorDynamicConfigsResource( + manager, + auditManager, + coordinatorDynamicConfigSyncer, + cloneStatusManager + ); + Response response = resource.getCloneStatus(null); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(new CloneStatus(statusMetrics), response.getEntity()); + + response = resource.getCloneStatus("hist2"); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ServerCloneStatus.unknown("hist4", "hist3"), response.getEntity()); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java index 42bb7b51421c..7af834942bdf 100644 --- a/services/src/main/java/org/apache/druid/cli/CliBroker.java +++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java @@ -28,6 +28,7 @@ import com.google.inject.name.Names; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.BrokerServerView; +import org.apache.druid.client.BrokerViewOfCoordinatorConfig; import org.apache.druid.client.CachingClusteredClient; import org.apache.druid.client.DirectDruidClientFactory; import org.apache.druid.client.HttpServerInventoryViewResource; @@ -57,6 +58,7 @@ import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.RetryQueryRunnerConfig; import org.apache.druid.query.lookup.LookupModule; +import org.apache.druid.server.BrokerDynamicConfigResource; import org.apache.druid.server.BrokerQueryResource; import org.apache.druid.server.ClientInfoResource; import org.apache.druid.server.ClientQuerySegmentWalker; @@ -160,6 +162,7 @@ protected List getModules() binder.bind(SubqueryCountStatsProvider.class).toInstance(new SubqueryCountStatsProvider()); Jerseys.addResource(binder, BrokerResource.class); Jerseys.addResource(binder, ClientInfoResource.class); + Jerseys.addResource(binder, BrokerDynamicConfigResource.class); LifecycleModule.register(binder, BrokerQueryResource.class); @@ -167,6 +170,7 @@ protected List getModules() LifecycleModule.register(binder, Server.class); binder.bind(SegmentManager.class).in(LazySingleton.class); + binder.bind(BrokerViewOfCoordinatorConfig.class).in(ManageLifecycle.class); binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.BROKER)); Jerseys.addResource(binder, HistoricalResource.class); diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 1f61d9716eba..993f3f7bf4c1 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -96,6 +96,7 @@ import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QuerySchedulerProvider; import org.apache.druid.server.compaction.CompactionStatusTracker; +import org.apache.druid.server.coordinator.CloneStatusManager; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.MetadataManager; @@ -112,6 +113,7 @@ import org.apache.druid.server.http.ClusterResource; import org.apache.druid.server.http.CoordinatorCompactionConfigsResource; import org.apache.druid.server.http.CoordinatorCompactionResource; +import org.apache.druid.server.http.CoordinatorDynamicConfigSyncer; import org.apache.druid.server.http.CoordinatorDynamicConfigsResource; import org.apache.druid.server.http.CoordinatorRedirectInfo; import org.apache.druid.server.http.CoordinatorResource; @@ -224,6 +226,7 @@ public void configure(Binder binder) binder.bind(DruidCoordinatorConfig.class); binder.bind(RedirectFilter.class).in(LazySingleton.class); + binder.bind(CoordinatorDynamicConfigSyncer.class).in(LazySingleton.class); if (beOverlord) { binder.bind(RedirectInfo.class).to(CoordinatorOverlordRedirectInfo.class).in(LazySingleton.class); } else { @@ -246,6 +249,7 @@ public void configure(Binder binder) .in(ManageLifecycle.class); binder.bind(LookupCoordinatorManager.class).in(LazySingleton.class); + binder.bind(CloneStatusManager.class).in(LazySingleton.class); binder.bind(CoordinatorConfigManager.class); binder.bind(MetadataManager.class); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java index eb92a090d62b..f1a8dd0be1c1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.Sets; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.BrokerServerView; +import org.apache.druid.client.BrokerViewOfCoordinatorConfig; import org.apache.druid.client.DirectDruidClient; import org.apache.druid.client.DirectDruidClientFactory; import org.apache.druid.client.DruidServer; @@ -58,6 +59,7 @@ import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.server.coordination.TestCoordinatorClient; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.timeline.DataSegment; @@ -387,12 +389,15 @@ private static BrokerServerView newBrokerServerView(FilteredServerInventoryView .anyTimes(); EasyMock.replay(druidClientFactory); + BrokerViewOfCoordinatorConfig filter = new BrokerViewOfCoordinatorConfig(new TestCoordinatorClient()); + filter.start(); return new BrokerServerView( druidClientFactory, baseView, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), new NoopServiceEmitter(), - new BrokerSegmentWatcherConfig() + new BrokerSegmentWatcherConfig(), + filter ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java index 88df3b80a4e5..a23cefacb4fa 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java @@ -26,6 +26,7 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy; +import org.apache.druid.client.selector.HistoricalFilter; import org.apache.druid.client.selector.RandomServerSelectorStrategy; import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.client.selector.TierSelectorStrategy; @@ -108,7 +109,7 @@ public Optional> getTimeline(Ta Comparator.naturalOrder() ); TierSelectorStrategy st = new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()); - ServerSelector sss = new ServerSelector(segment, st); + ServerSelector sss = new ServerSelector(segment, st, HistoricalFilter.IDENTITY_FILTER); PartitionChunk partitionChunk = new SingleElementPartitionChunk(sss); timelineLookup.add(segment.getInterval(), segment.getVersion(), partitionChunk);