diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index 6fa37a896a72..20c8d5f2765f 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -478,7 +478,7 @@ public void groupByQuery(Blackhole blackhole) private List runQuery() { //noinspection unchecked - QueryRunner theRunner = new FluentQueryRunnerBuilder<>(toolChestWarehouse.getToolChest(query)) + QueryRunner theRunner = new FluentQueryRunnerBuilder(toolChestWarehouse.getToolChest(query)) .create(cachingClusteredClient.getQueryRunnerForIntervals(query, query.getIntervals())) .applyPreMergeDecoration() .mergeResults() diff --git a/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java index 8b198d1e0de7..c2d68855da84 100644 --- a/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java @@ -40,6 +40,7 @@ import org.apache.druid.query.GenericQueryMetricsFactory; import org.apache.druid.query.Query; import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.server.log.RequestLogger; import org.apache.druid.server.metrics.QueryCountStatsProvider; @@ -533,7 +534,7 @@ private void emitQueryTime(long requestTimeNs, boolean success) { QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( queryMetricsFactory, - warehouse.getToolChest(query), + (QueryToolChest) warehouse.getToolChest(query), query, req.getRemoteAddr() ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index d8d280031222..b3972f19d440 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -69,6 +69,8 @@ import org.apache.druid.server.coordinator.helper.DruidCoordinatorRuleRunner; import org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentCompactor; import org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentInfoLoader; +import org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentReplicantLookupConsumer; +import org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentReplicantLookupConsumer.DruidCoordinatorContext; import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.initialization.ZkPathsConfig; @@ -90,6 +92,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -688,7 +691,35 @@ public void run() } } } + + private class DruidCoordinatorInstanceContextSupplier implements Supplier + { + + @Override + public DruidCoordinatorContext get() + { + return new DruidCoordinatorContext(serverInventoryView, loadManagementPeons, taskMaster, metadataRuleManager); + } + + } + private class DruidCoordinatorInstanceSegmentReplicantLookupConsumer + extends DruidCoordinatorSegmentReplicantLookupConsumer + { + + public DruidCoordinatorInstanceSegmentReplicantLookupConsumer() + { + super(new DruidCoordinatorInstanceContextSupplier()); + } + + @Override + public void accept(SegmentReplicantLookup t) + { + segmentReplicantLookup = t; + } + + } + private class CoordinatorHistoricalManagerRunnable extends CoordinatorRunnable { public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter) @@ -696,68 +727,7 @@ public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter) super( ImmutableList.of( new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this), - params -> { - List servers = serverInventoryView - .getInventory() - .stream() - .filter(DruidServer::segmentReplicatable) - .map(DruidServer::toImmutableDruidServer) - .collect(Collectors.toList()); - - if (log.isDebugEnabled()) { - // Display info about all historical servers - log.debug("Servers"); - for (ImmutableDruidServer druidServer : servers) { - log.debug(" %s", druidServer); - log.debug(" -- DataSources"); - for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) { - log.debug(" %s", druidDataSource); - } - } - } - - // Find all historical servers, group them by subType and sort by ascending usage - Set decommissioningServers = params.getCoordinatorDynamicConfig().getDecommissioningNodes(); - final DruidCluster cluster = new DruidCluster(); - for (ImmutableDruidServer server : servers) { - if (!loadManagementPeons.containsKey(server.getName())) { - LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); - loadQueuePeon.start(); - log.debug("Created LoadQueuePeon for server[%s].", server.getName()); - - loadManagementPeons.put(server.getName(), loadQueuePeon); - } - - cluster.add( - new ServerHolder( - server, - loadManagementPeons.get(server.getName()), - decommissioningServers.contains(server.getHost()) - ) - ); - } - - segmentReplicantLookup = SegmentReplicantLookup.make(cluster); - - // Stop peons for servers that aren't there anymore. - final Set disappeared = Sets.newHashSet(loadManagementPeons.keySet()); - for (ImmutableDruidServer server : servers) { - disappeared.remove(server.getName()); - } - for (String name : disappeared) { - log.debug("Removing listener for server[%s] which is no longer there.", name); - LoadQueuePeon peon = loadManagementPeons.remove(name); - peon.stop(); - } - - return params.buildFromExisting() - .withDruidCluster(cluster) - .withDatabaseRuleManager(metadataRuleManager) - .withLoadManagementPeons(loadManagementPeons) - .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerReferenceTimestamp(DateTimes.nowUtc()) - .build(); - }, + new DruidCoordinatorInstanceSegmentReplicantLookupConsumer(), new DruidCoordinatorRuleRunner(DruidCoordinator.this), new DruidCoordinatorCleanupUnneeded(), new DruidCoordinatorCleanupOvershadowed(DruidCoordinator.this), diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentReplicantLookupConsumer.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentReplicantLookupConsumer.java new file mode 100644 index 000000000000..4ce31cdef788 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentReplicantLookupConsumer.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.helper; + +import com.google.common.collect.Sets; +import org.apache.druid.client.DruidServer; +import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.client.ServerInventoryView; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.MetadataRuleManager; +import org.apache.druid.server.coordinator.DruidCluster; +import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.LoadQueuePeon; +import org.apache.druid.server.coordinator.LoadQueueTaskMaster; +import org.apache.druid.server.coordinator.SegmentReplicantLookup; +import org.apache.druid.server.coordinator.ServerHolder; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public abstract class DruidCoordinatorSegmentReplicantLookupConsumer implements DruidCoordinatorHelper, Consumer +{ + + private static final EmittingLogger log = new EmittingLogger(DruidCoordinatorSegmentReplicantLookupConsumer.class); + + private final Supplier druidCoordinatorContextSupplier; + + public DruidCoordinatorSegmentReplicantLookupConsumer( + Supplier druidCoordinatorContextSupplier) + { + this.druidCoordinatorContextSupplier = druidCoordinatorContextSupplier; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + DruidCoordinatorContext druidCoordinatorContext = druidCoordinatorContextSupplier.get(); + ServerInventoryView serverInventoryView = druidCoordinatorContext.getServerInventoryView(); + Map loadManagementPeons = druidCoordinatorContext.getLoadManagementPeons(); + LoadQueueTaskMaster taskMaster = druidCoordinatorContext.getTaskMaster(); + MetadataRuleManager metadataRuleManager = druidCoordinatorContext.getMetadataRuleManager(); + + List servers = serverInventoryView + .getInventory() + .stream() + .filter(DruidServer::segmentReplicatable) + .map(DruidServer::toImmutableDruidServer) + .collect(Collectors.toList()); + + if (log.isDebugEnabled()) { + // Display info about all historical servers + log.debug("Servers"); + for (ImmutableDruidServer druidServer : servers) { + log.debug(" %s", druidServer); + log.debug(" -- DataSources"); + for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) { + log.debug(" %s", druidDataSource); + } + } + } + + // Find all historical servers, group them by subType and sort by ascending usage + Set decommissioningServers = params.getCoordinatorDynamicConfig().getDecommissioningNodes(); + final DruidCluster cluster = new DruidCluster(); + for (ImmutableDruidServer server : servers) { + if (!loadManagementPeons.containsKey(server.getName())) { + LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); + loadQueuePeon.start(); + log.info("Created LoadQueuePeon for server[%s].", server.getName()); + + loadManagementPeons.put(server.getName(), loadQueuePeon); + } + + cluster.add( + new ServerHolder( + server, + loadManagementPeons.get(server.getName()), + decommissioningServers.contains(server.getHost()) + ) + ); + } + + SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(cluster); + accept(segmentReplicantLookup); + + // Stop peons for servers that aren't there anymore. + final Set disappeared = Sets.newHashSet(loadManagementPeons.keySet()); + for (ImmutableDruidServer server : servers) { + disappeared.remove(server.getName()); + } + for (String name : disappeared) { + log.info("Removing listener for server[%s] which is no longer there.", name); + LoadQueuePeon peon = loadManagementPeons.remove(name); + peon.stop(); + } + + return params.buildFromExisting() + .withDruidCluster(cluster) + .withDatabaseRuleManager(metadataRuleManager) + .withLoadManagementPeons(loadManagementPeons) + .withSegmentReplicantLookup(segmentReplicantLookup) + .withBalancerReferenceTimestamp(DateTimes.nowUtc()) + .build(); + } + + public static class DruidCoordinatorContext + { + + private final ServerInventoryView serverInventoryView; + private final Map loadManagementPeons; + private final LoadQueueTaskMaster taskMaster; + private final MetadataRuleManager metadataRuleManager; + + public DruidCoordinatorContext(ServerInventoryView serverInventoryView, + Map loadManagementPeons, LoadQueueTaskMaster taskMaster, + MetadataRuleManager metadataRuleManager) + { + this.serverInventoryView = serverInventoryView; + this.loadManagementPeons = loadManagementPeons; + this.taskMaster = taskMaster; + this.metadataRuleManager = metadataRuleManager; + } + + public ServerInventoryView getServerInventoryView() + { + return serverInventoryView; + } + + public Map getLoadManagementPeons() + { + return loadManagementPeons; + } + + public LoadQueueTaskMaster getTaskMaster() + { + return taskMaster; + } + + public MetadataRuleManager getMetadataRuleManager() + { + return metadataRuleManager; + } + + } + +}