From ad65ed48ad15f67fc4a3879270f87141b6662074 Mon Sep 17 00:00:00 2001 From: "dmitry.golitsyn" Date: Tue, 29 Aug 2017 18:18:40 -0500 Subject: [PATCH 1/5] Add CachingCostBalancerStrategy; Rename ServerView.ServerCallback to ServerRemovedCallback --- .../CachingCostBalancerStrategyBenchmark.java | 120 +++++ .../druid/indexing/test/TestServerView.java | 2 +- .../AbstractCuratorServerInventoryView.java | 6 +- .../io/druid/client/BrokerServerView.java | 8 +- .../druid/client/CoordinatorServerView.java | 4 +- .../client/FilteredServerInventoryView.java | 2 +- .../druid/client/HttpServerInventoryView.java | 6 +- .../main/java/io/druid/client/ServerView.java | 4 +- .../CachingCostBalancerStrategy.java | 75 +++ .../CachingCostBalancerStrategyFactory.java | 163 +++++++ .../coordinator/cost/ClusterCostCache.java | 92 ++++ .../coordinator/cost/SegmentsCostCache.java | 432 ++++++++++++++++++ .../coordinator/cost/ServerCostCache.java | 99 ++++ ...chingClusteredClientFunctionalityTest.java | 2 +- .../client/CachingClusteredClientTest.java | 2 +- .../cost/CachingCostBalancerStrategyTest.java | 176 +++++++ .../cost/SegmentsCostCacheTest.java | 194 ++++++++ .../java/io/druid/cli/CliRealtimeExample.java | 4 +- .../calcite/util/TestServerInventoryView.java | 6 +- 19 files changed, 1372 insertions(+), 25 deletions(-) create mode 100644 benchmarks/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyBenchmark.java create mode 100644 server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategy.java create mode 100644 server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java create mode 100644 server/src/main/java/io/druid/server/coordinator/cost/ClusterCostCache.java create mode 100644 server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java create mode 100644 server/src/main/java/io/druid/server/coordinator/cost/ServerCostCache.java create mode 100644 server/src/test/java/io/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java create mode 100644 server/src/test/java/io/druid/server/coordinator/cost/SegmentsCostCacheTest.java diff --git a/benchmarks/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyBenchmark.java b/benchmarks/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyBenchmark.java new file mode 100644 index 000000000000..85d652dbfeb7 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyBenchmark.java @@ -0,0 +1,120 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.logger.Logger; +import io.druid.server.coordinator.cost.SegmentsCostCache; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +public class CachingCostBalancerStrategyBenchmark +{ + private static final Logger log = new Logger(CachingCostBalancerStrategyBenchmark.class); + + private static final int NUMBER_OF_SEGMENTS = 100000; + private static final int NUMBER_OF_QUERIES = 500; + + private static final long DAYS_IN_MONTH = 30; + + private final DateTime referenceTime = DateTimes.of("2014-01-01T00:00:00"); + private final Set segments = new HashSet<>(); + private final Set segmentQueries = new HashSet<>(); + private final int seed = ThreadLocalRandom.current().nextInt(); + + private SegmentsCostCache segmentsCostCache; + + @Setup + public void createSegments() + { + Random random = new Random(seed); + SegmentsCostCache.Builder prototype = SegmentsCostCache.builder(); + for (int i = 0; i < NUMBER_OF_SEGMENTS; ++i) { + DataSegment segment = createSegment(random.nextInt((int) TimeUnit.DAYS.toHours(DAYS_IN_MONTH))); + segments.add(segment); + } + segmentsCostCache = prototype.build(); + for (int i = 0; i < NUMBER_OF_QUERIES; ++i) { + DataSegment segment = createSegment(random.nextInt((int) TimeUnit.DAYS.toHours(DAYS_IN_MONTH))); + segmentQueries.add(segment); + } + for (DataSegment segment : segments) { + prototype.addSegment(segment); + } + + log.info("GENERATING SEGMENTS : %d / %d", NUMBER_OF_SEGMENTS, NUMBER_OF_QUERIES); + } + + @Benchmark + public double measureCostStrategy() throws InterruptedException + { + double cost = 0.0; + for (DataSegment segment : segmentQueries) { + cost += CostBalancerStrategy.computeJointSegmentsCost(segment, segments); + } + return cost; + } + + @Benchmark + public double measureCachingCostStrategy() throws InterruptedException + { + double cost = 0.0; + for (DataSegment segment : segmentQueries) { + cost += segmentsCostCache.cost(segment); + } + return cost; + } + + private DataSegment createSegment(int shift) + { + return new DataSegment( + "dataSource", + new Interval(referenceTime.plusHours(shift), referenceTime.plusHours(shift).plusHours(1)), + "version", + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + null, + 0, + 100 + ); + } + +} diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java index 1d68061b4feb..b8ab8c8141a9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java @@ -48,7 +48,7 @@ public void registerSegmentCallback( } @Override - public void registerServerCallback(Executor exec, ServerView.ServerCallback callback) + public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback) { // No-op } diff --git a/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java index 492347a523bb..8b6785a95fc3 100644 --- a/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java +++ b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java @@ -52,7 +52,7 @@ public abstract class AbstractCuratorServerInventoryView implemen private final CuratorInventoryManager inventoryManager; private final AtomicBoolean started = new AtomicBoolean(false); - private final ConcurrentMap serverCallbacks = new MapMaker().makeMap(); + private final ConcurrentMap serverCallbacks = new MapMaker().makeMap(); private final ConcurrentMap segmentCallbacks = new MapMaker().makeMap(); public AbstractCuratorServerInventoryView( @@ -210,7 +210,7 @@ public Iterable getInventory() } @Override - public void registerServerCallback(Executor exec, ServerCallback callback) + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) { serverCallbacks.put(callback, exec); } @@ -249,7 +249,7 @@ public void run() protected void runServerCallbacks(final DruidServer server) { - for (final Map.Entry entry : serverCallbacks.entrySet()) { + for (final Map.Entry entry : serverCallbacks.entrySet()) { entry.getValue().execute( new Runnable() { diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 13495c552654..15af669be9cb 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -150,9 +150,9 @@ public CallbackAction segmentViewInitialized() segmentFilter ); - baseView.registerServerCallback( + baseView.registerServerRemovedCallback( exec, - new ServerView.ServerCallback() + new ServerRemovedCallback() { @Override public ServerView.CallbackAction serverRemoved(DruidServer server) @@ -322,9 +322,9 @@ public QueryRunner getQueryRunner(DruidServer server) } @Override - public void registerServerCallback(Executor exec, ServerCallback callback) + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) { - baseView.registerServerCallback(exec, callback); + baseView.registerServerRemovedCallback(exec, callback); } @Override diff --git a/server/src/main/java/io/druid/client/CoordinatorServerView.java b/server/src/main/java/io/druid/client/CoordinatorServerView.java index a572deecd14a..7291c43bc3c5 100644 --- a/server/src/main/java/io/druid/client/CoordinatorServerView.java +++ b/server/src/main/java/io/druid/client/CoordinatorServerView.java @@ -88,9 +88,9 @@ public ServerView.CallbackAction segmentViewInitialized() } ); - baseView.registerServerCallback( + baseView.registerServerRemovedCallback( exec, - new ServerView.ServerCallback() + new ServerView.ServerRemovedCallback() { @Override public ServerView.CallbackAction serverRemoved(DruidServer server) diff --git a/server/src/main/java/io/druid/client/FilteredServerInventoryView.java b/server/src/main/java/io/druid/client/FilteredServerInventoryView.java index 38f7e188dc09..44ecd16a1c1a 100644 --- a/server/src/main/java/io/druid/client/FilteredServerInventoryView.java +++ b/server/src/main/java/io/druid/client/FilteredServerInventoryView.java @@ -33,5 +33,5 @@ public void registerSegmentCallback( Executor exec, ServerView.SegmentCallback callback, Predicate> filter ); - public void registerServerCallback(Executor exec, ServerView.ServerCallback callback); + public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback); } diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryView.java b/server/src/main/java/io/druid/client/HttpServerInventoryView.java index 71adf3493ad0..ab72b916f6e3 100644 --- a/server/src/main/java/io/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/io/druid/client/HttpServerInventoryView.java @@ -92,7 +92,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer private final LifecycleLock lifecycleLock = new LifecycleLock(); - private final ConcurrentMap serverCallbacks = new MapMaker().makeMap(); + private final ConcurrentMap serverCallbacks = new MapMaker().makeMap(); private final ConcurrentMap segmentCallbacks = new MapMaker().makeMap(); private final ConcurrentMap>> segmentPredicates = new MapMaker() @@ -266,7 +266,7 @@ public void registerSegmentCallback( } @Override - public void registerServerCallback(Executor exec, ServerCallback callback) + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) { serverCallbacks.put(callback, exec); } @@ -330,7 +330,7 @@ public void run() private void runServerCallbacks(final DruidServer server) { - for (final Map.Entry entry : serverCallbacks.entrySet()) { + for (final Map.Entry entry : serverCallbacks.entrySet()) { entry.getValue().execute( new Runnable() { diff --git a/server/src/main/java/io/druid/client/ServerView.java b/server/src/main/java/io/druid/client/ServerView.java index a25ab562afce..73c4c3ed3d5a 100644 --- a/server/src/main/java/io/druid/client/ServerView.java +++ b/server/src/main/java/io/druid/client/ServerView.java @@ -28,7 +28,7 @@ */ public interface ServerView { - public void registerServerCallback(Executor exec, ServerCallback callback); + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback); public void registerSegmentCallback(Executor exec, SegmentCallback callback); public enum CallbackAction @@ -37,7 +37,7 @@ public enum CallbackAction UNREGISTER, } - public static interface ServerCallback + public static interface ServerRemovedCallback { /** * Called when a server is removed. diff --git a/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategy.java new file mode 100644 index 000000000000..fa01ca58fefb --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategy.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListeningExecutorService; +import io.druid.server.coordinator.cost.ClusterCostCache; +import io.druid.timeline.DataSegment; + +import java.util.Set; + + +public class CachingCostBalancerStrategy extends CostBalancerStrategy +{ + + private final ClusterCostCache clusterCostCache; + + public CachingCostBalancerStrategy(ClusterCostCache clusterCostCache, ListeningExecutorService exec) + { + super(exec); + this.clusterCostCache = Preconditions.checkNotNull(clusterCostCache); + } + + @Override + protected double computeCost( + DataSegment proposalSegment, ServerHolder server, boolean includeCurrentServer + ) + { + final long proposalSegmentSize = proposalSegment.getSize(); + + // (optional) Don't include server if it is already serving segment + if (!includeCurrentServer && server.isServingSegment(proposalSegment)) { + return Double.POSITIVE_INFINITY; + } + + // Don't calculate cost if the server doesn't have enough space or is loading the segment + if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) { + return Double.POSITIVE_INFINITY; + } + + final String serverName = server.getServer().getName(); + + double cost = clusterCostCache.computeCost(serverName, proposalSegment); + + // add segments that will be loaded to the cost + cost += costCacheForLoadingSegments(server).computeCost(serverName, proposalSegment); + + return cost; + } + + private ClusterCostCache costCacheForLoadingSegments(ServerHolder server) + { + final Set loadingSegments = server.getPeon().getSegmentsToLoad(); + return ClusterCostCache.builder(ImmutableMap.of(server.getServer().getName(), loadingSegments)).build(); + } + +} diff --git a/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java new file mode 100644 index 000000000000..cd0a1e7960d0 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java @@ -0,0 +1,163 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.inject.Inject; +import com.metamx.emitter.EmittingLogger; +import io.druid.client.ServerInventoryView; +import io.druid.client.ServerView; +import io.druid.concurrent.Execs; +import io.druid.concurrent.LifecycleLock; +import io.druid.guice.ManageLifecycle; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.lifecycle.LifecycleStart; +import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordinator.cost.ClusterCostCache; +import io.druid.timeline.DataSegment; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +@ManageLifecycle +public class CachingCostBalancerStrategyFactory implements BalancerStrategyFactory +{ + private static final EmittingLogger LOG = new EmittingLogger(CachingCostBalancerStrategyFactory.class); + + private final ServerInventoryView serverInventoryView; + private final LifecycleLock lifecycleLock = new LifecycleLock(); + private volatile boolean initialized = false; + private final ExecutorService executor = Execs.singleThreaded("CachingCostBalancerStrategy-executor"); + private final ClusterCostCache.Builder clusterCostCacheBuilder = ClusterCostCache.builder(); + + @Inject + public CachingCostBalancerStrategyFactory(ServerInventoryView serverInventoryView) + { + this.serverInventoryView = Preconditions.checkNotNull(serverInventoryView); + } + + @LifecycleStart + public void start() + { + if (!lifecycleLock.canStart()) { + throw new ISE("CachingCostBalancerStrategyFactory can not be started"); + } + try { + serverInventoryView.registerSegmentCallback( + executor, + new ServerView.SegmentCallback() + { + @Override + public ServerView.CallbackAction segmentAdded( + DruidServerMetadata server, DataSegment segment + ) + { + clusterCostCacheBuilder.addSegment(server.getName(), segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentRemoved( + DruidServerMetadata server, DataSegment segment + ) + { + clusterCostCacheBuilder.removeSegment(server.getName(), segment); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentViewInitialized() + { + initialized = true; + return ServerView.CallbackAction.CONTINUE; + } + } + ); + + serverInventoryView.registerServerRemovedCallback( + executor, + server -> { + clusterCostCacheBuilder.removeServer(server.getName()); + return ServerView.CallbackAction.CONTINUE; + } + ); + + lifecycleLock.started(); + } + finally { + lifecycleLock.exitStart(); + } + } + + @LifecycleStop + public void stop() + { + if (!lifecycleLock.canStop()) { + throw new ISE("CachingCostBalancerStrategyFactory can not be stopped"); + } + executor.shutdownNow(); + } + + @Override + public BalancerStrategy createBalancerStrategy(final ListeningExecutorService exec) + { + if (!lifecycleLock.awaitStarted()) { + throw new ISE("CachingCostBalancerStrategyFactory is not started"); + } + if (initialized) { + try { + CompletableFuture future = CompletableFuture.supplyAsync( + () -> new CachingCostBalancerStrategy(clusterCostCacheBuilder.build(), exec), + executor + ); + try { + return future.get(1, TimeUnit.SECONDS); + } + catch (CancellationException e) { + LOG.error("CachingCostBalancerStrategy creation has been cancelled"); + } + catch (ExecutionException e) { + LOG.error(e, "Failed to create CachingCostBalancerStrategy"); + } + catch (TimeoutException e) { + LOG.error("CachingCostBalancerStrategy creation took more than 1 second!"); + } + catch (InterruptedException e) { + LOG.error("CachingCostBalancerStrategy creation has been interrupted"); + Thread.currentThread().interrupt(); + } + } + catch (RejectedExecutionException e) { + LOG.error("CachingCostBalancerStrategy creation has been rejected"); + } + } else { + LOG.error("CachingCostBalancerStrategy could not be created as serverView is not initialized yet"); + } + LOG.info("Fallback to CostBalancerStrategy"); + return new CostBalancerStrategy(exec); + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/cost/ClusterCostCache.java b/server/src/main/java/io/druid/server/coordinator/cost/ClusterCostCache.java new file mode 100644 index 000000000000..b8bab1282642 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/cost/ClusterCostCache.java @@ -0,0 +1,92 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.cost; + +import com.google.common.base.Preconditions; +import io.druid.timeline.DataSegment; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class ClusterCostCache +{ + private final Map serversCostCache; + + public ClusterCostCache(Map serversCostCache) + { + this.serversCostCache = Preconditions.checkNotNull(serversCostCache); + } + + public double computeCost(String serverName, DataSegment dataSegment) + { + ServerCostCache serverCostCache = serversCostCache.get(serverName); + return (serverCostCache != null) ? serverCostCache.computeCost(dataSegment) : 0.0; + } + + public static Builder builder() + { + return new Builder(); + } + + public static Builder builder(Map> segmentsByServerName) + { + Builder builder = builder(); + segmentsByServerName.forEach( + (serverName, segments) -> segments.forEach(segment -> builder.addSegment(serverName, segment)) + ); + return builder; + } + + public static class Builder + { + private final Map serversCostCache = new HashMap<>(); + + public void addSegment(String serverName, DataSegment dataSegment) + { + ServerCostCache.Builder builder = serversCostCache.computeIfAbsent(serverName, s -> ServerCostCache.builder()); + builder.addSegment(dataSegment); + } + + public void removeSegment(String serverName, DataSegment dataSegment) + { + serversCostCache.computeIfPresent( + serverName, + (s, builder) -> builder.removeSegment(dataSegment).isEmpty() ? null : builder + ); + } + + public void removeServer(String serverName) + { + serversCostCache.remove(serverName); + } + + public ClusterCostCache build() + { + return new ClusterCostCache( + serversCostCache + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())) + ); + } + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java b/server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java new file mode 100644 index 000000000000..f5d61fb85415 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java @@ -0,0 +1,432 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.cost; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Ordering; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.granularity.DurationGranularity; +import io.druid.java.util.common.guava.Comparators; +import io.druid.server.coordinator.CostBalancerStrategy; +import io.druid.timeline.DataSegment; +import org.apache.commons.math3.util.FastMath; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.ListIterator; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * SegmentsCostCache provides faster way to calculate cost function proposed in {@link CostBalancerStrategy}. + * See https://github.com/druid-io/druid/pull/2972 for more details about the cost function. + * + * Joint cost for two segments (you can make formulas below readable by copy-pasting to + * https://www.codecogs.com/latex/eqneditor.php): + * + * cost(X, Y) = \int_{x_0}^{x_1} \int_{y_0}^{y_1} e^{-\lambda |x-y|}dxdy + * or + * cost(X, Y) = e^{y_0 + y_1} (e^{x_0} - e^{x_1})(e^{y_0} - e^{y_1}) (*) + * if x_0 <= x_1 <= y_0 <= y_1 + * (*) lambda coefficient is omitted for simplicity. + * + * For a group of segments {S_xi}, i = {0, n} total joint cost with segment S_y could be calculated as: + * + * cost(X, Y) = \sum cost(X_i, Y) = e^{y_0 + y_1} (e^{y_0} - e^{y_1}) \sum (e^{xi_0} - e^{xi_1}) + * if xi_0 <= xi_1 <= y_0 <= y_1 + * and + * cost(X, Y) = \sum cost(X_i, Y) = (e^{y_0} - e^{y_1}) \sum e^{xi_0 + xi_1} (e^{xi_0} - e^{xi_1}) + * if y_0 <= y_1 <= xi_0 <= xi_1 + * + * SegmentsCostCache stores pre-computed sums for a group of segments {S_xi}: + * + * 1) \sum (e^{xi_0} - e^{xi_1}) -> leftSum + * 2) \sum e^{xi_0 + xi_1} (e^{xi_0} - e^{xi_1}) -> rightSum + * + * so that calculation of joint cost function for segment S_y became a O(1 + m) complexity task, where m + * is the number of segments in {S_xi} that overlaps S_y. + * + * Segments are stored in buckets. Bucket is a subset of segments contained in SegmentsCostCache, so that + * startTime of all segments inside a bucket are in the same time interval (with some granularity): + * + * |------------------------|--------------------------|-----------------------|-------- .... + * t_0 t_0+D t_0 + 2D t0 + 3D .... + * S_x1 S_x2 S_x3 S_x4 S_x5 S_x6 S_x7 S_x8 S_x9 + * bucket1 bucket2 bucket3 + * + * Reasons to store segments in Buckets: + * + * 1) Cost function tends to 0 as distance between segments' intervals increases; buckets + * are used to avoid redundant 0 calculations for thousands of times + * 2) To reduce number of calculations when segment is added or removed from SegmentsCostCache + * 3) To avoid infinite values during exponents calculations + * + */ +public class SegmentsCostCache +{ + /** + * HALF_LIFE_DAYS defines how fast joint cost function tends to 0 as distance between segments' intervals increasing. + * The value of 1 day means that cost function of co-locating two segments which have 1 days between their intervals + * is 0.5 of the cost, if the intervals are adjacent. If the distance is 2 days, then 0.25, etc. + */ + private static final double HALF_LIFE_DAYS = 1.0; + private static final double LAMBDA = Math.log(2) / HALF_LIFE_DAYS; + private static final double MILLIS_FACTOR = TimeUnit.DAYS.toMillis(1) / LAMBDA; + + /** + * LIFE_THRESHOLD is used to avoid calculations for segments that are "far" + * from each other and thus cost(X,Y) ~ 0 for these segments + */ + private static final long LIFE_THRESHOLD = TimeUnit.DAYS.toMillis(30); + + /** + * Bucket interval defines duration granularity for segment buckets. Number of buckets control the trade-off + * between updates (add/remove segment operation) and joint cost calculation: + * 1) updates complexity is increasing when number of buckets is decreasing (as buckets contain more segments) + * 2) joint cost calculation complexity is increasing with increasing of buckets number + */ + private static final long BUCKET_INTERVAL = TimeUnit.DAYS.toMillis(15); + private static final DurationGranularity BUCKET_GRANULARITY = new DurationGranularity(BUCKET_INTERVAL, 0); + + private static final Comparator SEGMENT_INTERVAL_COMPARATOR = + Comparator.comparing(DataSegment::getInterval, Comparators.intervalsByStartThenEnd()); + + private static final Comparator BUCKET_INTERVAL_COMPARATOR = + Comparator.comparing(Bucket::getInterval, Comparators.intervalsByStartThenEnd()); + + private static final Ordering SEGMENT_ORDERING = Ordering.from(SEGMENT_INTERVAL_COMPARATOR); + private static final Ordering BUCKET_ORDERING = Ordering.from(BUCKET_INTERVAL_COMPARATOR); + + private final ArrayList sortedBuckets; + private final ArrayList intervals; + + public SegmentsCostCache(ArrayList sortedBuckets) + { + this.sortedBuckets = Preconditions.checkNotNull(sortedBuckets, "buckets should not be null"); + this.intervals = sortedBuckets.stream().map(Bucket::getInterval).collect(Collectors.toCollection(ArrayList::new)); + Preconditions.checkArgument( + BUCKET_ORDERING.isOrdered(sortedBuckets), + "buckets must be ordered by interval" + ); + } + + public double cost(DataSegment segment) + { + double cost = 0.0; + int index = Collections.binarySearch(intervals, segment.getInterval(), Comparators.intervalsByStartThenEnd()); + index = (index >= 0) ? index : -index - 1; + + for (ListIterator it = sortedBuckets.listIterator(index); it.hasNext(); ) { + Bucket bucket = it.next(); + if (!bucket.inCalculationInterval(segment)) { + break; + } + cost += bucket.cost(segment); + } + + for (ListIterator it = sortedBuckets.listIterator(index); it.hasPrevious(); ) { + Bucket bucket = it.previous(); + if (!bucket.inCalculationInterval(segment)) { + break; + } + cost += bucket.cost(segment); + } + + return cost; + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private NavigableMap buckets = new TreeMap<>(Comparators.intervalsByStartThenEnd()); + + public Builder addSegment(DataSegment segment) + { + Bucket.Builder builder = buckets.computeIfAbsent(getBucketInterval(segment), Bucket::builder); + builder.addSegment(segment); + return this; + } + + public Builder removeSegment(DataSegment segment) + { + Interval interval = getBucketInterval(segment); + buckets.computeIfPresent( + interval, + (i, builder) -> builder.removeSegment(segment).isEmpty() ? null : builder + ); + return this; + } + + public boolean isEmpty() + { + return buckets.isEmpty(); + } + + public SegmentsCostCache build() + { + return new SegmentsCostCache( + buckets + .entrySet() + .stream() + .map(entry -> entry.getValue().build()) + .collect(Collectors.toCollection(ArrayList::new)) + ); + } + + private Interval getBucketInterval(DataSegment segment) + { + return BUCKET_GRANULARITY.bucket(segment.getInterval().getStart()); + } + } + + static class Bucket + { + private final Interval interval; + private final Interval calculationInterval; + private final ArrayList sortedSegments; + private final double[] leftSum; + private final double[] rightSum; + + public Bucket(Interval interval, ArrayList sortedSegments, double[] leftSum, double[] rightSum) + { + this.interval = Preconditions.checkNotNull(interval, "interval"); + this.sortedSegments = Preconditions.checkNotNull(sortedSegments, "sortedSegments"); + this.leftSum = Preconditions.checkNotNull(leftSum, "leftSum"); + this.rightSum = Preconditions.checkNotNull(rightSum, "rightSum"); + Preconditions.checkArgument(sortedSegments.size() == leftSum.length && sortedSegments.size() == rightSum.length); + Preconditions.checkArgument(SEGMENT_ORDERING.isOrdered(sortedSegments)); + this.calculationInterval = new Interval( + interval.getStart().minus(LIFE_THRESHOLD), + interval.getEnd().plus(LIFE_THRESHOLD) + ); + } + + public Interval getInterval() + { + return interval; + } + + public boolean inCalculationInterval(DataSegment dataSegment) + { + return calculationInterval.contains(dataSegment.getInterval()); + } + + public double cost(DataSegment dataSegment) + { + // cost is calculated relatively to bucket start (which is considered as 0) + double t0 = convertStart(dataSegment, interval); + double t1 = convertEnd(dataSegment, interval); + + // avoid calculation for segments outside of LIFE_THRESHOLD + if (!inCalculationInterval(dataSegment)) { + throw new ISE("Segment is not within calculation interval"); + } + + int index = Collections.binarySearch(sortedSegments, dataSegment, SEGMENT_INTERVAL_COMPARATOR); + index = (index >= 0) ? index : -index - 1; + return addLeftCost(dataSegment, t0, t1, index) + rightCost(dataSegment, t0, t1, index); + } + + private double addLeftCost(DataSegment dataSegment, double t0, double t1, int index) + { + double leftCost = 0.0; + // add to cost all left-overlapping segments + int leftIndex = index - 1; + while (leftIndex >= 0 + && sortedSegments.get(leftIndex).getInterval().overlaps(dataSegment.getInterval())) { + double start = convertStart(sortedSegments.get(leftIndex), interval); + double end = convertEnd(sortedSegments.get(leftIndex), interval); + leftCost += CostBalancerStrategy.intervalCost(end - start, t0 - start, t1 - start); + --leftIndex; + } + // add left-non-overlapping segments + if (leftIndex >= 0) { + leftCost += leftSum[leftIndex] * (FastMath.exp(-t1) - FastMath.exp(-t0)); + } + return leftCost; + } + + private double rightCost(DataSegment dataSegment, double t0, double t1, int index) + { + double rightCost = 0.0; + // add all right-overlapping segments + int rightIndex = index; + while (rightIndex < sortedSegments.size() && + sortedSegments.get(rightIndex).getInterval().overlaps(dataSegment.getInterval())) { + double start = convertStart(sortedSegments.get(rightIndex), interval); + double end = convertEnd(sortedSegments.get(rightIndex), interval); + rightCost += CostBalancerStrategy.intervalCost(t1 - t0, start - t0, end - t0); + ++rightIndex; + } + // add right-non-overlapping segments + if (rightIndex < sortedSegments.size()) { + rightCost += rightSum[rightIndex] * (FastMath.exp(t0) - FastMath.exp(t1)); + } + return rightCost; + } + + private static double convertStart(DataSegment dataSegment, Interval interval) + { + return toLocalInterval(dataSegment.getInterval().getStartMillis(), interval); + } + + private static double convertEnd(DataSegment dataSegment, Interval interval) + { + return toLocalInterval(dataSegment.getInterval().getEndMillis(), interval); + } + + private static double toLocalInterval(long millis, Interval interval) + { + return (millis - interval.getStartMillis()) / MILLIS_FACTOR; + } + + public static Builder builder(Interval interval) + { + return new Builder(interval); + } + + static class Builder + { + private final Interval interval; + private final NavigableSet segments = new TreeSet<>(); + + public Builder(Interval interval) + { + this.interval = interval; + } + + public Builder addSegment(DataSegment dataSegment) + { + if (!interval.contains(dataSegment.getInterval().getStartMillis())) { + throw new ISE("Failed to add segment to bucket: interval is not covered by this bucket"); + } + + // all values are pre-computed relatively to bucket start (which is considered as 0) + double t0 = convertStart(dataSegment, interval); + double t1 = convertEnd(dataSegment, interval); + + double leftValue = FastMath.exp(t0) - FastMath.exp(t1); + double rightValue = FastMath.exp(-t1) - FastMath.exp(-t0); + + SegmentAndSum segmentAndSum = new SegmentAndSum(dataSegment, leftValue, rightValue); + + // left/right value should be added to left/right sums for elements greater/lower than current segment + segments.tailSet(segmentAndSum).forEach(v -> v.leftSum += leftValue); + segments.headSet(segmentAndSum).forEach(v -> v.rightSum += rightValue); + + // leftSum_i = leftValue_i + \sum leftValue_j = leftValue_i + leftSum_{i-1} , j < i + SegmentAndSum lower = segments.lower(segmentAndSum); + if (lower != null) { + segmentAndSum.leftSum = leftValue + lower.leftSum; + } + + // rightSum_i = rightValue_i + \sum rightValue_j = rightValue_i + rightSum_{i+1} , j > i + SegmentAndSum higher = segments.higher(segmentAndSum); + if (higher != null) { + segmentAndSum.rightSum = rightValue + higher.rightSum; + } + + if (!segments.add(segmentAndSum)) { + throw new ISE("expect new segment"); + } + return this; + } + + public Builder removeSegment(DataSegment dataSegment) + { + SegmentAndSum segmentAndSum = new SegmentAndSum(dataSegment, 0.0, 0.0); + + if (!segments.remove(segmentAndSum)) { + return this; + } + + double t0 = convertStart(dataSegment, interval); + double t1 = convertEnd(dataSegment, interval); + + double leftValue = FastMath.exp(t0) - FastMath.exp(t1); + double rightValue = FastMath.exp(-t1) - FastMath.exp(-t0); + + segments.tailSet(segmentAndSum).forEach(v -> v.leftSum -= leftValue); + segments.headSet(segmentAndSum).forEach(v -> v.rightSum -= rightValue); + return this; + } + + public boolean isEmpty() + { + return segments.isEmpty(); + } + + public Bucket build() + { + ArrayList segmentsList = new ArrayList<>(segments.size()); + double[] leftSum = new double[segments.size()]; + double[] rightSum = new double[segments.size()]; + + int i = 0; + for (SegmentAndSum segmentAndSum : segments) { + segmentsList.add(i, segmentAndSum.dataSegment); + leftSum[i] = segmentAndSum.leftSum; + rightSum[i] = segmentAndSum.rightSum; + ++i; + } + return new Bucket( + new Interval( + interval.getStart(), + segmentsList.get(segments.size() - 1).getInterval().getEnd() + ), + segmentsList, + leftSum, + rightSum + ); + } + } + } + + static class SegmentAndSum implements Comparable + { + private final DataSegment dataSegment; + private double leftSum; + private double rightSum; + + public SegmentAndSum(DataSegment dataSegment, double leftSum, double rightSum) + { + this.dataSegment = dataSegment; + this.leftSum = leftSum; + this.rightSum = rightSum; + } + + @Override + public int compareTo(SegmentAndSum o) + { + int c = Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), o.dataSegment.getInterval()); + return (c != 0) ? c : dataSegment.compareTo(o.dataSegment); + } + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/cost/ServerCostCache.java b/server/src/main/java/io/druid/server/coordinator/cost/ServerCostCache.java new file mode 100644 index 000000000000..14505d7bcd3c --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/cost/ServerCostCache.java @@ -0,0 +1,99 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.cost; + +import com.google.common.base.Preconditions; +import io.druid.timeline.DataSegment; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class ServerCostCache +{ + private final SegmentsCostCache allSegmentsCostCache; + private final Map segmentsPerDataSource; + + public ServerCostCache( + SegmentsCostCache allSegmentsCostCache, + Map segmentsCostPerDataSource + ) + { + this.allSegmentsCostCache = Preconditions.checkNotNull(allSegmentsCostCache); + this.segmentsPerDataSource = Preconditions.checkNotNull(segmentsCostPerDataSource); + } + + public double computeCost(DataSegment segment) + { + return allSegmentsCostCache.cost(segment) + computeDataSourceCost(segment); + } + + private double computeDataSourceCost(DataSegment segment) + { + SegmentsCostCache costCache = segmentsPerDataSource.get(segment.getDataSource()); + return (costCache == null) ? 0.0 : costCache.cost(segment); + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private final SegmentsCostCache.Builder allSegmentsCostCache = SegmentsCostCache.builder(); + private final Map segmentsPerDataSource = new HashMap<>(); + + public Builder addSegment(DataSegment dataSegment) + { + allSegmentsCostCache.addSegment(dataSegment); + segmentsPerDataSource + .computeIfAbsent(dataSegment.getDataSource(), d -> SegmentsCostCache.builder()) + .addSegment(dataSegment); + return this; + } + + public Builder removeSegment(DataSegment dataSegment) + { + allSegmentsCostCache.removeSegment(dataSegment); + segmentsPerDataSource.computeIfPresent( + dataSegment.getDataSource(), + (ds, builder) -> builder.removeSegment(dataSegment).isEmpty() ? null : builder + ); + return this; + } + + public boolean isEmpty() + { + return allSegmentsCostCache.isEmpty(); + } + + public ServerCostCache build() + { + return new ServerCostCache( + allSegmentsCostCache.build(), + segmentsPerDataSource + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())) + ); + } + } +} diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java index 936e538a8113..62110511dbd6 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java @@ -239,7 +239,7 @@ public QueryRunner getQueryRunner(DruidServer server) } @Override - public void registerServerCallback(Executor exec, ServerCallback callback) + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) { } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index a68e5c573fe7..6c27ff86ddf4 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -2734,7 +2734,7 @@ public void registerTimelineCallback(final Executor exec, final TimelineCallback } @Override - public void registerServerCallback(Executor exec, ServerCallback callback) + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) { } diff --git a/server/src/test/java/io/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java b/server/src/test/java/io/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java new file mode 100644 index 000000000000..a24aeeec56f8 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.cost; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.DruidServer; +import io.druid.concurrent.Execs; +import io.druid.server.coordination.ServerType; +import io.druid.server.coordinator.CachingCostBalancerStrategy; +import io.druid.server.coordinator.CostBalancerStrategy; +import io.druid.server.coordinator.LoadQueuePeonTester; +import io.druid.server.coordinator.ServerHolder; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class CachingCostBalancerStrategyTest +{ + private static final int DAYS_IN_MONTH = 30; + private static final int SEGMENT_SIZE = 100; + private static final int NUMBER_OF_SEGMENTS_ON_SERVER = 10000; + private static final int NUMBER_OF_QUERIES = 1000; + private static final int NUMBER_OF_SERVERS = 3; + + private List serverHolderList; + private List segmentQueries; + private ListeningExecutorService executorService; + + @Before + public void setUp() throws Exception + { + Random random = new Random(0); + DateTime referenceTime = new DateTime("2014-01-01T00:00:00"); + + serverHolderList = IntStream + .range(0, NUMBER_OF_SERVERS) + .mapToObj(i -> + createServerHolder( + String.valueOf(i), + String.valueOf(i), + SEGMENT_SIZE * (NUMBER_OF_SEGMENTS_ON_SERVER + NUMBER_OF_QUERIES), + NUMBER_OF_SEGMENTS_ON_SERVER, + random, + referenceTime + ) + ) + .collect(Collectors.toList()); + + segmentQueries = createDataSegments(NUMBER_OF_QUERIES, random, referenceTime); + executorService = MoreExecutors.listeningDecorator(Execs.singleThreaded("")); + } + + @After + public void tearDown() throws Exception + { + executorService.shutdownNow(); + } + + @Test + public void decisionTest() throws Exception + { + CachingCostBalancerStrategy cachingCostBalancerStrategy = createCachingCostBalancerStrategy( + serverHolderList, + executorService + ); + CostBalancerStrategy costBalancerStrategy = createCostBalancerStrategy(executorService); + int notEqual = segmentQueries + .stream() + .mapToInt( + s -> { + ServerHolder s1 = cachingCostBalancerStrategy.findNewSegmentHomeBalancer(s, serverHolderList); + ServerHolder s2 = costBalancerStrategy.findNewSegmentHomeBalancer(s, serverHolderList); + return (s1.getServer().getName().equals(s2.getServer().getName())) ? 0 : 1; + } + ) + .sum(); + Assert.assertTrue(((double) notEqual / (double) segmentQueries.size()) < 0.01); + } + + private CachingCostBalancerStrategy createCachingCostBalancerStrategy( + List serverHolders, + ListeningExecutorService listeningExecutorService + ) + { + ClusterCostCache.Builder builder = ClusterCostCache.builder(); + serverHolders.forEach( + s -> s.getServer().getSegments().values().forEach( + segment -> builder.addSegment(s.getServer().getName(), segment) + ) + ); + return new CachingCostBalancerStrategy(builder.build(), listeningExecutorService); + } + + private CostBalancerStrategy createCostBalancerStrategy(ListeningExecutorService listeningExecutorService) + { + return new CostBalancerStrategy(listeningExecutorService); + } + + private ServerHolder createServerHolder( + String name, + String host, + int maxSize, + int numberOfSegments, + Random random, + DateTime referenceTime + ) + { + DruidServer druidServer = new DruidServer(name, host, null, maxSize, ServerType.HISTORICAL, "normal", 0); + createDataSegments(numberOfSegments, random, referenceTime) + .forEach(segment -> druidServer.addDataSegment(segment.getIdentifier(), segment)); + return new ServerHolder( + druidServer.toImmutableDruidServer(), + new LoadQueuePeonTester() + ); + } + + private List createDataSegments( + int numberOfSegments, + Random random, + DateTime referenceTime + ) + { + return new ArrayList<>( + IntStream + .range(0, numberOfSegments) + .mapToObj(i -> createRandomSegment(random, referenceTime)) + .collect(Collectors.toSet()) + ); + } + + private DataSegment createRandomSegment(Random random, DateTime referenceTime) + { + int timeShift = random.nextInt((int) TimeUnit.DAYS.toHours(DAYS_IN_MONTH * 12)); + return new DataSegment( + String.valueOf(random.nextInt(50)), + new Interval(referenceTime.plusHours(timeShift), referenceTime.plusHours(timeShift + 1)), + "version", + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + null, + 0, + 100 + ); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/cost/SegmentsCostCacheTest.java b/server/src/test/java/io/druid/server/coordinator/cost/SegmentsCostCacheTest.java new file mode 100644 index 000000000000..de97ec26f7f9 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/cost/SegmentsCostCacheTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.cost; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.druid.timeline.DataSegment; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SegmentsCostCacheTest +{ + + private static final String DATA_SOURCE = "dataSource"; + private static final DateTime REFERENCE_TIME = new DateTime("2014-01-01T00:00:00"); + private static final double EPSILON = 0.00000001; + + @Test + public void segmentCacheTest() + { + SegmentsCostCache.Builder cacheBuilder = SegmentsCostCache.builder(); + cacheBuilder.addSegment(createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100)); + SegmentsCostCache cache = cacheBuilder.build(); + assertEquals( + 7.8735899489011E-4, + cache.cost(createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, -2), 100)), + EPSILON + ); + } + + @Test + public void notInCalculationIntervalCostTest() + { + SegmentsCostCache.Builder cacheBuilder = SegmentsCostCache.builder(); + cacheBuilder.addSegment( + createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100) + ); + SegmentsCostCache cache = cacheBuilder.build(); + assertEquals( + 0, + cache.cost(createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, (int) TimeUnit.DAYS.toHours(50)), 100)), + EPSILON + ); + } + + @Test + public void twoSegmentsCostTest() + { + DataSegment segmentA = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100); + DataSegment segmentB = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, -2), 100); + + SegmentsCostCache.Bucket.Builder prototype = SegmentsCostCache.Bucket.builder(new Interval( + REFERENCE_TIME.minusHours(5), + REFERENCE_TIME.plusHours(5) + )); + + prototype.addSegment(segmentA); + SegmentsCostCache.Bucket bucket = prototype.build(); + + double segmentCost = bucket.cost(segmentB); + assertEquals(7.8735899489011E-4, segmentCost, EPSILON); + } + + @Test + public void calculationIntervalTest() + { + DataSegment segmentA = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100); + DataSegment segmentB = createSegment( + DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, (int) TimeUnit.DAYS.toHours(50)), 100 + ); + + SegmentsCostCache.Bucket.Builder prototype = SegmentsCostCache.Bucket.builder(new Interval( + REFERENCE_TIME.minusHours(5), + REFERENCE_TIME.plusHours(5) + )); + prototype.addSegment(segmentA); + SegmentsCostCache.Bucket bucket = prototype.build(); + + assertTrue(bucket.inCalculationInterval(segmentA)); + assertFalse(bucket.inCalculationInterval(segmentB)); + } + + @Test + public void sameSegmentCostTest() + { + DataSegment segmentA = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100); + DataSegment segmentB = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100); + + SegmentsCostCache.Bucket.Builder prototype = SegmentsCostCache.Bucket.builder(new Interval( + REFERENCE_TIME.minusHours(5), + REFERENCE_TIME.plusHours(5) + )); + + prototype.addSegment(segmentA); + SegmentsCostCache.Bucket bucket = prototype.build(); + + double segmentCost = bucket.cost(segmentB); + assertEquals(8.26147353873985E-4, segmentCost, EPSILON); + } + + @Test + public void multipleSegmentsCostTest() + { + DataSegment segmentA = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, -2), 100); + DataSegment segmentB = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 0), 100); + DataSegment segmentC = createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, 2), 100); + + SegmentsCostCache.Bucket.Builder prototype = SegmentsCostCache.Bucket.builder(new Interval( + REFERENCE_TIME.minusHours(5), + REFERENCE_TIME.plusHours(5) + )); + + prototype.addSegment(segmentA); + prototype.addSegment(segmentC); + SegmentsCostCache.Bucket bucket = prototype.build(); + + double segmentCost = bucket.cost(segmentB); + + assertEquals(0.001574717989780039, segmentCost, EPSILON); + } + + @Test + public void randomSegmentsCostTest() + { + List dataSegments = new ArrayList<>(1000); + Random random = new Random(1); + for (int i = 0; i < 1000; ++i) { + dataSegments.add(createSegment(DATA_SOURCE, shifted1HInterval(REFERENCE_TIME, random.nextInt(20)), 100)); + } + + DataSegment referenceSegment = createSegment("ANOTHER_DATA_SOURCE", shifted1HInterval(REFERENCE_TIME, 5), 100); + + SegmentsCostCache.Bucket.Builder prototype = SegmentsCostCache.Bucket.builder(new Interval( + REFERENCE_TIME.minusHours(1), + REFERENCE_TIME.plusHours(25) + )); + dataSegments.forEach(prototype::addSegment); + SegmentsCostCache.Bucket bucket = prototype.build(); + + double cost = bucket.cost(referenceSegment); + assertEquals(0.7065117101966677, cost, EPSILON); + } + + public static Interval shifted1HInterval(DateTime REFERENCE_TIME, int shiftInHours) + { + return new Interval( + REFERENCE_TIME.plusHours(shiftInHours), + REFERENCE_TIME.plusHours(shiftInHours + 1) + ); + } + + public static DataSegment createSegment(String dataSource, Interval interval, long size) + { + return new DataSegment( + dataSource, + interval, + UUID.randomUUID().toString(), + Maps.newConcurrentMap(), + Lists.newArrayList(), + Lists.newArrayList(), + null, + 0, + size + ); + } +} diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index efadc9a8ad5d..8cd45b59e45c 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -98,8 +98,8 @@ public void configure(Binder binder) private static class NoopServerView implements ServerView { @Override - public void registerServerCallback( - Executor exec, ServerCallback callback + public void registerServerRemovedCallback( + Executor exec, ServerRemovedCallback callback ) { // do nothing diff --git a/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java b/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java index 08888f1184df..2919e9076f8e 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/TestServerInventoryView.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableList; import io.druid.client.DruidServer; -import io.druid.client.ServerView; import io.druid.client.TimelineServerView; import io.druid.client.selector.ServerSelector; import io.druid.query.DataSource; @@ -85,10 +84,7 @@ public QueryRunner getQueryRunner(DruidServer server) } @Override - public void registerServerCallback( - Executor exec, - ServerView.ServerCallback callback - ) + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) { // Do nothing } From c9a2238d46f0c9bf9f2acd58a03e74529e04a9ac Mon Sep 17 00:00:00 2001 From: leventov Date: Wed, 30 Aug 2017 14:26:33 -0500 Subject: [PATCH 2/5] Fix benchmark units --- .../coordinator/CachingCostBalancerStrategyBenchmark.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/benchmarks/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyBenchmark.java b/benchmarks/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyBenchmark.java index 85d652dbfeb7..5ee4e3543b83 100644 --- a/benchmarks/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyBenchmark.java +++ b/benchmarks/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyBenchmark.java @@ -28,11 +28,13 @@ import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; import java.util.Collections; import java.util.HashSet; @@ -43,7 +45,9 @@ @State(Scope.Benchmark) @BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.MILLISECONDS) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@Warmup(iterations = 5) +@Measurement(iterations = 10) @Fork(1) public class CachingCostBalancerStrategyBenchmark { From af556133e4cb4f3f9113b6cc4eec887927f0721d Mon Sep 17 00:00:00 2001 From: leventov Date: Wed, 30 Aug 2017 15:47:45 -0500 Subject: [PATCH 3/5] Style, forbidden-api, review, bug fixes --- .../coordinator/BalancerStrategyFactory.java | 1 + .../CachingCostBalancerStrategy.java | 4 +- .../CachingCostBalancerStrategyFactory.java | 10 ++-- .../coordinator/cost/ClusterCostCache.java | 2 +- .../coordinator/cost/SegmentsCostCache.java | 47 +++++++++++-------- .../coordinator/cost/ServerCostCache.java | 4 +- .../server/coordinator/cost/package-info.java | 23 +++++++++ .../cost/CachingCostBalancerStrategyTest.java | 3 +- .../cost/SegmentsCostCacheTest.java | 3 +- .../java/io/druid/cli/CliRealtimeExample.java | 8 +--- 10 files changed, 65 insertions(+), 40 deletions(-) create mode 100644 server/src/main/java/io/druid/server/coordinator/cost/package-info.java diff --git a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java index 011b63f59088..879201e93392 100644 --- a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java @@ -26,6 +26,7 @@ @JsonSubTypes(value = { @JsonSubTypes.Type(name = "diskNormalized", value = DiskNormalizedCostBalancerStrategyFactory.class), @JsonSubTypes.Type(name = "cost", value = CostBalancerStrategyFactory.class), + @JsonSubTypes.Type(name = "cachingCost", value = CachingCostBalancerStrategyFactory.class), @JsonSubTypes.Type(name = "random", value = RandomBalancerStrategyFactory.class), }) public interface BalancerStrategyFactory diff --git a/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategy.java index fa01ca58fefb..b9b9adb5d38c 100644 --- a/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategy.java @@ -40,9 +40,7 @@ public CachingCostBalancerStrategy(ClusterCostCache clusterCostCache, ListeningE } @Override - protected double computeCost( - DataSegment proposalSegment, ServerHolder server, boolean includeCurrentServer - ) + protected double computeCost(DataSegment proposalSegment, ServerHolder server, boolean includeCurrentServer) { final long proposalSegmentSize = proposalSegment.getSize(); diff --git a/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java index cd0a1e7960d0..6227489f7384 100644 --- a/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java @@ -50,9 +50,9 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto private final ServerInventoryView serverInventoryView; private final LifecycleLock lifecycleLock = new LifecycleLock(); - private volatile boolean initialized = false; private final ExecutorService executor = Execs.singleThreaded("CachingCostBalancerStrategy-executor"); private final ClusterCostCache.Builder clusterCostCacheBuilder = ClusterCostCache.builder(); + private volatile boolean initialized = false; @Inject public CachingCostBalancerStrategyFactory(ServerInventoryView serverInventoryView) @@ -72,18 +72,14 @@ public void start() new ServerView.SegmentCallback() { @Override - public ServerView.CallbackAction segmentAdded( - DruidServerMetadata server, DataSegment segment - ) + public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment) { clusterCostCacheBuilder.addSegment(server.getName(), segment); return ServerView.CallbackAction.CONTINUE; } @Override - public ServerView.CallbackAction segmentRemoved( - DruidServerMetadata server, DataSegment segment - ) + public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) { clusterCostCacheBuilder.removeSegment(server.getName(), segment); return ServerView.CallbackAction.CONTINUE; diff --git a/server/src/main/java/io/druid/server/coordinator/cost/ClusterCostCache.java b/server/src/main/java/io/druid/server/coordinator/cost/ClusterCostCache.java index b8bab1282642..fe803fc84ac9 100644 --- a/server/src/main/java/io/druid/server/coordinator/cost/ClusterCostCache.java +++ b/server/src/main/java/io/druid/server/coordinator/cost/ClusterCostCache.java @@ -31,7 +31,7 @@ public class ClusterCostCache { private final Map serversCostCache; - public ClusterCostCache(Map serversCostCache) + ClusterCostCache(Map serversCostCache) { this.serversCostCache = Preconditions.checkNotNull(serversCostCache); } diff --git a/server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java b/server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java index f5d61fb85415..140b180a6c3b 100644 --- a/server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java +++ b/server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Ordering; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; import io.druid.java.util.common.granularity.DurationGranularity; import io.druid.java.util.common.guava.Comparators; import io.druid.server.coordinator.CostBalancerStrategy; @@ -123,7 +124,7 @@ public class SegmentsCostCache private final ArrayList sortedBuckets; private final ArrayList intervals; - public SegmentsCostCache(ArrayList sortedBuckets) + SegmentsCostCache(ArrayList sortedBuckets) { this.sortedBuckets = Preconditions.checkNotNull(sortedBuckets, "buckets should not be null"); this.intervals = sortedBuckets.stream().map(Bucket::getInterval).collect(Collectors.toCollection(ArrayList::new)); @@ -200,7 +201,7 @@ public SegmentsCostCache build() ); } - private Interval getBucketInterval(DataSegment segment) + private static Interval getBucketInterval(DataSegment segment) { return BUCKET_GRANULARITY.bucket(segment.getInterval().getStart()); } @@ -214,7 +215,7 @@ static class Bucket private final double[] leftSum; private final double[] rightSum; - public Bucket(Interval interval, ArrayList sortedSegments, double[] leftSum, double[] rightSum) + Bucket(Interval interval, ArrayList sortedSegments, double[] leftSum, double[] rightSum) { this.interval = Preconditions.checkNotNull(interval, "interval"); this.sortedSegments = Preconditions.checkNotNull(sortedSegments, "sortedSegments"); @@ -228,17 +229,17 @@ public Bucket(Interval interval, ArrayList sortedSegments, double[] ); } - public Interval getInterval() + Interval getInterval() { return interval; } - public boolean inCalculationInterval(DataSegment dataSegment) + boolean inCalculationInterval(DataSegment dataSegment) { - return calculationInterval.contains(dataSegment.getInterval()); + return calculationInterval.overlaps(dataSegment.getInterval()); } - public double cost(DataSegment dataSegment) + double cost(DataSegment dataSegment) { // cost is calculated relatively to bucket start (which is considered as 0) double t0 = convertStart(dataSegment, interval); @@ -388,23 +389,19 @@ public Bucket build() ArrayList segmentsList = new ArrayList<>(segments.size()); double[] leftSum = new double[segments.size()]; double[] rightSum = new double[segments.size()]; - int i = 0; for (SegmentAndSum segmentAndSum : segments) { - segmentsList.add(i, segmentAndSum.dataSegment); + segmentsList.add(segmentAndSum.dataSegment); leftSum[i] = segmentAndSum.leftSum; rightSum[i] = segmentAndSum.rightSum; ++i; } - return new Bucket( - new Interval( - interval.getStart(), - segmentsList.get(segments.size() - 1).getInterval().getEnd() - ), - segmentsList, - leftSum, - rightSum - ); + long bucketEndMillis = segmentsList + .stream() + .mapToLong(s -> s.getInterval().getEndMillis()) + .max() + .orElseGet(interval::getEndMillis); + return new Bucket(Intervals.utc(interval.getStartMillis(), bucketEndMillis), segmentsList, leftSum, rightSum); } } } @@ -415,7 +412,7 @@ static class SegmentAndSum implements Comparable private double leftSum; private double rightSum; - public SegmentAndSum(DataSegment dataSegment, double leftSum, double rightSum) + SegmentAndSum(DataSegment dataSegment, double leftSum, double rightSum) { this.dataSegment = dataSegment; this.leftSum = leftSum; @@ -428,5 +425,17 @@ public int compareTo(SegmentAndSum o) int c = Comparators.intervalsByStartThenEnd().compare(dataSegment.getInterval(), o.dataSegment.getInterval()); return (c != 0) ? c : dataSegment.compareTo(o.dataSegment); } + + @Override + public boolean equals(Object obj) + { + throw new UnsupportedOperationException("Use SegmentAndSum.compareTo()"); + } + + @Override + public int hashCode() + { + throw new UnsupportedOperationException(); + } } } diff --git a/server/src/main/java/io/druid/server/coordinator/cost/ServerCostCache.java b/server/src/main/java/io/druid/server/coordinator/cost/ServerCostCache.java index 14505d7bcd3c..6991ccd43d0b 100644 --- a/server/src/main/java/io/druid/server/coordinator/cost/ServerCostCache.java +++ b/server/src/main/java/io/druid/server/coordinator/cost/ServerCostCache.java @@ -31,7 +31,7 @@ public class ServerCostCache private final SegmentsCostCache allSegmentsCostCache; private final Map segmentsPerDataSource; - public ServerCostCache( + ServerCostCache( SegmentsCostCache allSegmentsCostCache, Map segmentsCostPerDataSource ) @@ -40,7 +40,7 @@ public ServerCostCache( this.segmentsPerDataSource = Preconditions.checkNotNull(segmentsCostPerDataSource); } - public double computeCost(DataSegment segment) + double computeCost(DataSegment segment) { return allSegmentsCostCache.cost(segment) + computeDataSourceCost(segment); } diff --git a/server/src/main/java/io/druid/server/coordinator/cost/package-info.java b/server/src/main/java/io/druid/server/coordinator/cost/package-info.java new file mode 100644 index 000000000000..96aea759ed76 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/cost/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +@ParametersAreNonnullByDefault +package io.druid.server.coordinator.cost; + +import javax.annotation.ParametersAreNonnullByDefault; diff --git a/server/src/test/java/io/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java b/server/src/test/java/io/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java index a24aeeec56f8..67544c60bb81 100644 --- a/server/src/test/java/io/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java +++ b/server/src/test/java/io/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java @@ -23,6 +23,7 @@ import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.DruidServer; import io.druid.concurrent.Execs; +import io.druid.java.util.common.DateTimes; import io.druid.server.coordination.ServerType; import io.druid.server.coordinator.CachingCostBalancerStrategy; import io.druid.server.coordinator.CostBalancerStrategy; @@ -60,7 +61,7 @@ public class CachingCostBalancerStrategyTest public void setUp() throws Exception { Random random = new Random(0); - DateTime referenceTime = new DateTime("2014-01-01T00:00:00"); + DateTime referenceTime = DateTimes.of("2014-01-01T00:00:00"); serverHolderList = IntStream .range(0, NUMBER_OF_SERVERS) diff --git a/server/src/test/java/io/druid/server/coordinator/cost/SegmentsCostCacheTest.java b/server/src/test/java/io/druid/server/coordinator/cost/SegmentsCostCacheTest.java index de97ec26f7f9..c530847ffdf1 100644 --- a/server/src/test/java/io/druid/server/coordinator/cost/SegmentsCostCacheTest.java +++ b/server/src/test/java/io/druid/server/coordinator/cost/SegmentsCostCacheTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import io.druid.java.util.common.DateTimes; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -40,7 +41,7 @@ public class SegmentsCostCacheTest { private static final String DATA_SOURCE = "dataSource"; - private static final DateTime REFERENCE_TIME = new DateTime("2014-01-01T00:00:00"); + private static final DateTime REFERENCE_TIME = DateTimes.of("2014-01-01T00:00:00"); private static final double EPSILON = 0.00000001; @Test diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index 8cd45b59e45c..29c08235de66 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -98,17 +98,13 @@ public void configure(Binder binder) private static class NoopServerView implements ServerView { @Override - public void registerServerRemovedCallback( - Executor exec, ServerRemovedCallback callback - ) + public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) { // do nothing } @Override - public void registerSegmentCallback( - Executor exec, SegmentCallback callback - ) + public void registerSegmentCallback(Executor exec, SegmentCallback callback) { // do nothing } From 4af2b348eb30cae243bb6daa01a4788e351a6d2d Mon Sep 17 00:00:00 2001 From: leventov Date: Wed, 30 Aug 2017 17:43:03 -0500 Subject: [PATCH 4/5] Add docs --- docs/content/configuration/coordinator.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/configuration/coordinator.md b/docs/content/configuration/coordinator.md index 5a38a338fb1a..56c1c33b1390 100644 --- a/docs/content/configuration/coordinator.md +++ b/docs/content/configuration/coordinator.md @@ -33,7 +33,7 @@ The coordinator node uses several of the global configs in [Configuration](../co |`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)| |`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)| |`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0| -|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`| +|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. `cachingCost` is logically equivalent to `cost` but is more CPU-efficient on large clusters and will replace `cost` in the future versions, users are invited to try it. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`| |`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon , which manages the load and drop of segments.|PT0.050S (50 ms)| |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this coordinator node should act like an overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone overlord nodes. If set to true, then overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false| |`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord nodes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL| From 6c4f7eec378bdc519ed1da83dac95be843684a01 Mon Sep 17 00:00:00 2001 From: leventov Date: Tue, 5 Sep 2017 23:07:46 -0500 Subject: [PATCH 5/5] Address comments --- .../client/AbstractCuratorServerInventoryView.java | 12 ++++++------ .../CachingCostBalancerStrategyFactory.java | 2 ++ .../server/coordinator/cost/SegmentsCostCache.java | 2 ++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java index 8b6785a95fc3..5026130f175f 100644 --- a/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java +++ b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java @@ -52,7 +52,7 @@ public abstract class AbstractCuratorServerInventoryView implemen private final CuratorInventoryManager inventoryManager; private final AtomicBoolean started = new AtomicBoolean(false); - private final ConcurrentMap serverCallbacks = new MapMaker().makeMap(); + private final ConcurrentMap serverRemovedCallbacks = new MapMaker().makeMap(); private final ConcurrentMap segmentCallbacks = new MapMaker().makeMap(); public AbstractCuratorServerInventoryView( @@ -118,7 +118,7 @@ public void newContainer(DruidServer container) public void deadContainer(DruidServer deadContainer) { log.info("Server Disappeared[%s]", deadContainer); - runServerCallbacks(deadContainer); + runServerRemovedCallbacks(deadContainer); } @Override @@ -212,7 +212,7 @@ public Iterable getInventory() @Override public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) { - serverCallbacks.put(callback, exec); + serverRemovedCallbacks.put(callback, exec); } @Override @@ -247,9 +247,9 @@ public void run() } } - protected void runServerCallbacks(final DruidServer server) + private void runServerRemovedCallbacks(final DruidServer server) { - for (final Map.Entry entry : serverCallbacks.entrySet()) { + for (final Map.Entry entry : serverRemovedCallbacks.entrySet()) { entry.getValue().execute( new Runnable() { @@ -257,7 +257,7 @@ protected void runServerCallbacks(final DruidServer server) public void run() { if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { - serverCallbacks.remove(entry.getKey()); + serverRemovedCallbacks.remove(entry.getKey()); } } } diff --git a/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java index 6227489f7384..18ed2e4b214c 100644 --- a/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/CachingCostBalancerStrategyFactory.java @@ -50,6 +50,7 @@ public class CachingCostBalancerStrategyFactory implements BalancerStrategyFacto private final ServerInventoryView serverInventoryView; private final LifecycleLock lifecycleLock = new LifecycleLock(); + /** Must be single-threaded, because {@link ClusterCostCache.Builder} and downstream builders are not thread-safe */ private final ExecutorService executor = Execs.singleThreaded("CachingCostBalancerStrategy-executor"); private final ClusterCostCache.Builder clusterCostCacheBuilder = ClusterCostCache.builder(); private volatile boolean initialized = false; @@ -116,6 +117,7 @@ public void stop() throw new ISE("CachingCostBalancerStrategyFactory can not be stopped"); } executor.shutdownNow(); + // Not calling lifecycleLock.exitStop() because CachingCostBalancerStrategyFactory is not recycleable } @Override diff --git a/server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java b/server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java index 140b180a6c3b..a25ea2d94add 100644 --- a/server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java +++ b/server/src/main/java/io/druid/server/coordinator/cost/SegmentsCostCache.java @@ -180,6 +180,8 @@ public Builder removeSegment(DataSegment segment) Interval interval = getBucketInterval(segment); buckets.computeIfPresent( interval, + // If there are no move segments, returning null in computeIfPresent() removes the interval from the buckets + // map (i, builder) -> builder.removeSegment(segment).isEmpty() ? null : builder ); return this;