Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.coordinator;

import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.coordinator.cost.SegmentsCostCache;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import java.util.Collections;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 5)
@Measurement(iterations = 10)
@Fork(1)
public class CachingCostBalancerStrategyBenchmark
{
private static final Logger log = new Logger(CachingCostBalancerStrategyBenchmark.class);

private static final int NUMBER_OF_SEGMENTS = 100000;
private static final int NUMBER_OF_QUERIES = 500;

private static final long DAYS_IN_MONTH = 30;

private final DateTime referenceTime = DateTimes.of("2014-01-01T00:00:00");
private final Set<DataSegment> segments = new HashSet<>();
private final Set<DataSegment> segmentQueries = new HashSet<>();
private final int seed = ThreadLocalRandom.current().nextInt();

private SegmentsCostCache segmentsCostCache;

@Setup
public void createSegments()
{
Random random = new Random(seed);
SegmentsCostCache.Builder prototype = SegmentsCostCache.builder();
for (int i = 0; i < NUMBER_OF_SEGMENTS; ++i) {
DataSegment segment = createSegment(random.nextInt((int) TimeUnit.DAYS.toHours(DAYS_IN_MONTH)));
segments.add(segment);
}
segmentsCostCache = prototype.build();
for (int i = 0; i < NUMBER_OF_QUERIES; ++i) {
DataSegment segment = createSegment(random.nextInt((int) TimeUnit.DAYS.toHours(DAYS_IN_MONTH)));
segmentQueries.add(segment);
}
for (DataSegment segment : segments) {
prototype.addSegment(segment);
}

log.info("GENERATING SEGMENTS : %d / %d", NUMBER_OF_SEGMENTS, NUMBER_OF_QUERIES);
}

@Benchmark
public double measureCostStrategy() throws InterruptedException
{
double cost = 0.0;
for (DataSegment segment : segmentQueries) {
cost += CostBalancerStrategy.computeJointSegmentsCost(segment, segments);
}
return cost;
}

@Benchmark
public double measureCachingCostStrategy() throws InterruptedException
{
double cost = 0.0;
for (DataSegment segment : segmentQueries) {
cost += segmentsCostCache.cost(segment);
}
return cost;
}

private DataSegment createSegment(int shift)
{
return new DataSegment(
"dataSource",
new Interval(referenceTime.plusHours(shift), referenceTime.plusHours(shift).plusHours(1)),
"version",
Collections.<String, Object>emptyMap(),
Collections.<String>emptyList(),
Collections.<String>emptyList(),
null,
0,
100
);
}

}
2 changes: 1 addition & 1 deletion docs/content/configuration/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ The coordinator node uses several of the global configs in [Configuration](../co
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)|
|`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. `cachingCost` is logically equivalent to `cost` but is more CPU-efficient on large clusters and will replace `cost` in the future versions, users are invited to try it. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`|
|`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon , which manages the load and drop of segments.|PT0.050S (50 ms)|
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this coordinator node should act like an overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone overlord nodes. If set to true, then overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord nodes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void registerSegmentCallback(
}

@Override
public void registerServerCallback(Executor exec, ServerView.ServerCallback callback)
public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback)
{
// No-op
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public abstract class AbstractCuratorServerInventoryView<InventoryType> implemen
private final CuratorInventoryManager<DruidServer, InventoryType> inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);

private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<ServerRemovedCallback, Executor> serverRemovedCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();

public AbstractCuratorServerInventoryView(
Expand Down Expand Up @@ -118,7 +118,7 @@ public void newContainer(DruidServer container)
public void deadContainer(DruidServer deadContainer)
{
log.info("Server Disappeared[%s]", deadContainer);
runServerCallbacks(deadContainer);
runServerRemovedCallbacks(deadContainer);
}

@Override
Expand Down Expand Up @@ -210,9 +210,9 @@ public Iterable<DruidServer> getInventory()
}

@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
serverCallbacks.put(callback, exec);
serverRemovedCallbacks.put(callback, exec);
}

@Override
Expand Down Expand Up @@ -247,17 +247,17 @@ public void run()
}
}

protected void runServerCallbacks(final DruidServer server)
private void runServerRemovedCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverRemovedCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverCallbacks.remove(entry.getKey());
serverRemovedCallbacks.remove(entry.getKey());
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions server/src/main/java/io/druid/client/BrokerServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ public CallbackAction segmentViewInitialized()
segmentFilter
);

baseView.registerServerCallback(
baseView.registerServerRemovedCallback(
exec,
new ServerView.ServerCallback()
new ServerRemovedCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
Expand Down Expand Up @@ -322,9 +322,9 @@ public <T> QueryRunner<T> getQueryRunner(DruidServer server)
}

@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
baseView.registerServerCallback(exec, callback);
baseView.registerServerRemovedCallback(exec, callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ public ServerView.CallbackAction segmentViewInitialized()
}
);

baseView.registerServerCallback(
baseView.registerServerRemovedCallback(
exec,
new ServerView.ServerCallback()
new ServerView.ServerRemovedCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ public void registerSegmentCallback(
Executor exec, ServerView.SegmentCallback callback, Predicate<Pair<DruidServerMetadata, DataSegment>> filter
);

public void registerServerCallback(Executor exec, ServerView.ServerCallback callback);
public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer

private final LifecycleLock lifecycleLock = new LifecycleLock();

private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<ServerRemovedCallback, Executor> serverCallbacks = new MapMaker().makeMap();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new MapMaker().makeMap();

private final ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates = new MapMaker()
Expand Down Expand Up @@ -266,7 +266,7 @@ public void registerSegmentCallback(
}

@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
serverCallbacks.put(callback, exec);
}
Expand Down Expand Up @@ -330,7 +330,7 @@ public void run()

private void runServerCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/io/druid/client/ServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
public interface ServerView
{
public void registerServerCallback(Executor exec, ServerCallback callback);
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback);
public void registerSegmentCallback(Executor exec, SegmentCallback callback);

public enum CallbackAction
Expand All @@ -37,7 +37,7 @@ public enum CallbackAction
UNREGISTER,
}

public static interface ServerCallback
public static interface ServerRemovedCallback
{
/**
* Called when a server is removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "diskNormalized", value = DiskNormalizedCostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "cost", value = CostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "cachingCost", value = CachingCostBalancerStrategyFactory.class),
@JsonSubTypes.Type(name = "random", value = RandomBalancerStrategyFactory.class),
})
public interface BalancerStrategyFactory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.coordinator;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.server.coordinator.cost.ClusterCostCache;
import io.druid.timeline.DataSegment;

import java.util.Set;


public class CachingCostBalancerStrategy extends CostBalancerStrategy
{

private final ClusterCostCache clusterCostCache;

public CachingCostBalancerStrategy(ClusterCostCache clusterCostCache, ListeningExecutorService exec)
{
super(exec);
this.clusterCostCache = Preconditions.checkNotNull(clusterCostCache);
}

@Override
protected double computeCost(DataSegment proposalSegment, ServerHolder server, boolean includeCurrentServer)
{
final long proposalSegmentSize = proposalSegment.getSize();

// (optional) Don't include server if it is already serving segment
if (!includeCurrentServer && server.isServingSegment(proposalSegment)) {
return Double.POSITIVE_INFINITY;
}

// Don't calculate cost if the server doesn't have enough space or is loading the segment
if (proposalSegmentSize > server.getAvailableSize() || server.isLoadingSegment(proposalSegment)) {
return Double.POSITIVE_INFINITY;
}

final String serverName = server.getServer().getName();

double cost = clusterCostCache.computeCost(serverName, proposalSegment);

// add segments that will be loaded to the cost
cost += costCacheForLoadingSegments(server).computeCost(serverName, proposalSegment);

return cost;
}

private ClusterCostCache costCacheForLoadingSegments(ServerHolder server)
{
final Set<DataSegment> loadingSegments = server.getPeon().getSegmentsToLoad();
return ClusterCostCache.builder(ImmutableMap.of(server.getServer().getName(), loadingSegments)).build();
}

}
Loading