diff --git a/docs/content/configuration/coordinator.md b/docs/content/configuration/coordinator.md index 9aac673c7f35..5ed2c2c70867 100644 --- a/docs/content/configuration/coordinator.md +++ b/docs/content/configuration/coordinator.md @@ -32,6 +32,7 @@ The coordinator node uses several of the global configs in [Configuration](../co |`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)| |`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)| |`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0| +|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`| ### Metadata Retrieval diff --git a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java index be11848dc0be..011b63f59088 100644 --- a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java @@ -18,11 +18,17 @@ */ package io.druid.server.coordinator; -import org.joda.time.DateTime; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.util.concurrent.ListeningExecutorService; -import java.io.Closeable; - -public interface BalancerStrategyFactory extends Closeable +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = CostBalancerStrategyFactory.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "diskNormalized", value = DiskNormalizedCostBalancerStrategyFactory.class), + @JsonSubTypes.Type(name = "cost", value = CostBalancerStrategyFactory.class), + @JsonSubTypes.Type(name = "random", value = RandomBalancerStrategyFactory.class), +}) +public interface BalancerStrategyFactory { - public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp); + public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec); } diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java index e853302ae31c..016f6db0e1c9 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java @@ -19,30 +19,12 @@ package io.druid.server.coordinator; import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import org.joda.time.DateTime; - -import java.io.IOException; -import java.util.concurrent.Executors; public class CostBalancerStrategyFactory implements BalancerStrategyFactory { - private final ListeningExecutorService exec; - - public CostBalancerStrategyFactory(int costBalancerStrategyThreadCount) - { - this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(costBalancerStrategyThreadCount)); - } - @Override - public CostBalancerStrategy createBalancerStrategy(DateTime referenceTimestamp) + public CostBalancerStrategy createBalancerStrategy(ListeningExecutorService exec) { return new CostBalancerStrategy(exec); } - - @Override - public void close() throws IOException - { - exec.shutdownNow(); - } } diff --git a/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java new file mode 100644 index 000000000000..77203a06cc32 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java @@ -0,0 +1,59 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.google.common.util.concurrent.ListeningExecutorService; +import io.druid.timeline.DataSegment; + +public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy +{ + public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec) + { + super(exec); + } + + /** + * Averages the cost obtained from CostBalancerStrategy. Also the costs are weighted according to their usage ratios. + * This ensures that all the hosts will have the same % disk utilization. + */ + @Override + protected double computeCost( + final DataSegment proposalSegment, final ServerHolder server, final boolean includeCurrentServer + ) + { + double cost = super.computeCost(proposalSegment, server, includeCurrentServer); + + if(cost == Double.POSITIVE_INFINITY){ + return cost; + } + + int nSegments = 1; + if(server.getServer().getSegments().size() > 0) + { + nSegments = server.getServer().getSegments().size(); + } + + double normalizedCost = cost/nSegments; + double usageRatio = (double)server.getServer().getCurrSize()/(double)server.getServer().getMaxSize(); + + return normalizedCost*usageRatio; + } +} + diff --git a/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyFactory.java new file mode 100644 index 000000000000..6cbb9ecaf9a9 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.server.coordinator; + +import com.google.common.util.concurrent.ListeningExecutorService; + +public class DiskNormalizedCostBalancerStrategyFactory implements BalancerStrategyFactory +{ + @Override + public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) + { + return new DiskNormalizedCostBalancerStrategy(exec); + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index f971066a8af0..8d2625db84f8 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -28,6 +28,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; @@ -84,6 +86,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -129,7 +132,7 @@ public Interval apply(DataSegment segment) private volatile int leaderCounter = 0; private volatile boolean leader = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; - + private final BalancerStrategyFactory factory; @Inject public DruidCoordinator( @@ -146,7 +149,8 @@ public DruidCoordinator( LoadQueueTaskMaster taskMaster, ServiceAnnouncer serviceAnnouncer, @Self DruidNode self, - @CoordinatorIndexingServiceHelper Set indexingServiceHelpers + @CoordinatorIndexingServiceHelper Set indexingServiceHelpers, + BalancerStrategyFactory factory ) { this( @@ -164,7 +168,8 @@ public DruidCoordinator( serviceAnnouncer, self, Maps.newConcurrentMap(), - indexingServiceHelpers + indexingServiceHelpers, + factory ); } @@ -183,7 +188,8 @@ public DruidCoordinator( ServiceAnnouncer serviceAnnouncer, DruidNode self, ConcurrentMap loadQueuePeonMap, - Set indexingServiceHelpers + Set indexingServiceHelpers, + BalancerStrategyFactory factory ) { this.config = config; @@ -205,6 +211,7 @@ public DruidCoordinator( this.leaderLatch = new AtomicReference<>(null); this.loadManagementPeons = loadQueuePeonMap; + this.factory = factory; } public boolean isLeader() @@ -664,6 +671,7 @@ protected CoordinatorRunnable(List helpers, final int st @Override public void run() { + ListeningExecutorService balancerExec = null; try { synchronized (lock) { final LeaderLatch latch = leaderLatch.get(); @@ -686,27 +694,32 @@ public void run() } } - try (BalancerStrategyFactory factory = - new CostBalancerStrategyFactory(getDynamicConfigs().getBalancerComputeThreads())) { - // Do coordinator stuff. - DruidCoordinatorRuntimeParams params = - DruidCoordinatorRuntimeParams.newBuilder() - .withStartTime(startTime) - .withDatasources(metadataSegmentManager.getInventory()) - .withDynamicConfigs(getDynamicConfigs()) - .withEmitter(emitter) - .withBalancerStrategyFactory(factory) - .build(); - for (DruidCoordinatorHelper helper : helpers) { - // Don't read state and run state in the same helper otherwise racy conditions may exist - if (leader && startingLeaderCounter == leaderCounter) { - params = helper.run(params); - } + balancerExec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(getDynamicConfigs().getBalancerComputeThreads())); + BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec); + + // Do coordinator stuff. + DruidCoordinatorRuntimeParams params = + DruidCoordinatorRuntimeParams.newBuilder() + .withStartTime(startTime) + .withDatasources(metadataSegmentManager.getInventory()) + .withDynamicConfigs(getDynamicConfigs()) + .withEmitter(emitter) + .withBalancerStrategy(balancerStrategy) + .build(); + for (DruidCoordinatorHelper helper : helpers) { + // Don't read state and run state in the same helper otherwise racy conditions may exist + if (leader && startingLeaderCounter == leaderCounter) { + params = helper.run(params); } } } catch (Exception e) { log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit(); + } finally { + if(balancerExec != null){ + balancerExec.shutdownNow(); + } } } } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index 90f473197351..daea2cb31dd9 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -48,7 +48,7 @@ public class DruidCoordinatorRuntimeParams private final CoordinatorDynamicConfig coordinatorDynamicConfig; private final CoordinatorStats stats; private final DateTime balancerReferenceTimestamp; - private final BalancerStrategyFactory strategyFactory; + private final BalancerStrategy balancerStrategy; public DruidCoordinatorRuntimeParams( long startTime, @@ -63,7 +63,7 @@ public DruidCoordinatorRuntimeParams( CoordinatorDynamicConfig coordinatorDynamicConfig, CoordinatorStats stats, DateTime balancerReferenceTimestamp, - BalancerStrategyFactory strategyFactory + BalancerStrategy balancerStrategy ) { this.startTime = startTime; @@ -78,7 +78,7 @@ public DruidCoordinatorRuntimeParams( this.coordinatorDynamicConfig = coordinatorDynamicConfig; this.stats = stats; this.balancerReferenceTimestamp = balancerReferenceTimestamp; - this.strategyFactory = strategyFactory; + this.balancerStrategy = balancerStrategy; } public long getStartTime() @@ -141,9 +141,9 @@ public DateTime getBalancerReferenceTimestamp() return balancerReferenceTimestamp; } - public BalancerStrategyFactory getBalancerStrategyFactory() + public BalancerStrategy getBalancerStrategy() { - return strategyFactory; + return balancerStrategy; } public boolean hasDeletionWaitTimeElapsed() @@ -171,7 +171,7 @@ public Builder buildFromExisting() coordinatorDynamicConfig, stats, balancerReferenceTimestamp, - strategyFactory + balancerStrategy ); } @@ -190,7 +190,7 @@ public Builder buildFromExistingWithoutAvailableSegments() coordinatorDynamicConfig, stats, balancerReferenceTimestamp, - strategyFactory + balancerStrategy ); } @@ -208,7 +208,7 @@ public static class Builder private CoordinatorDynamicConfig coordinatorDynamicConfig; private CoordinatorStats stats; private DateTime balancerReferenceTimestamp; - private BalancerStrategyFactory strategyFactory; + private BalancerStrategy balancerStrategy; Builder() { @@ -239,7 +239,7 @@ public static class Builder CoordinatorDynamicConfig coordinatorDynamicConfig, CoordinatorStats stats, DateTime balancerReferenceTimestamp, - BalancerStrategyFactory strategyFactory + BalancerStrategy balancerStrategy ) { this.startTime = startTime; @@ -254,7 +254,7 @@ public static class Builder this.coordinatorDynamicConfig = coordinatorDynamicConfig; this.stats = stats; this.balancerReferenceTimestamp = balancerReferenceTimestamp; - this.strategyFactory=strategyFactory; + this.balancerStrategy=balancerStrategy; } public DruidCoordinatorRuntimeParams build() @@ -272,7 +272,7 @@ public DruidCoordinatorRuntimeParams build() coordinatorDynamicConfig, stats, balancerReferenceTimestamp, - strategyFactory + balancerStrategy ); } @@ -348,9 +348,9 @@ public Builder withBalancerReferenceTimestamp(DateTime balancerReferenceTimestam return this; } - public Builder withBalancerStrategyFactory(BalancerStrategyFactory strategyFactory) + public Builder withBalancerStrategy(BalancerStrategy balancerStrategy) { - this.strategyFactory=strategyFactory; + this.balancerStrategy=balancerStrategy; return this; } } diff --git a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java index 773ff16efd4f..ca7639db2a05 100644 --- a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java @@ -18,21 +18,13 @@ */ package io.druid.server.coordinator; -import org.joda.time.DateTime; - -import java.io.IOException; +import com.google.common.util.concurrent.ListeningExecutorService; public class RandomBalancerStrategyFactory implements BalancerStrategyFactory { @Override - public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp) + public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) { return new RandomBalancerStrategy(); } - - @Override - public void close() throws IOException - { - // No-op - } } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index fda4cf634273..67e750dd927f 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -34,7 +34,6 @@ import io.druid.server.coordinator.LoadQueuePeon; import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; -import org.joda.time.DateTime; import java.util.Comparator; import java.util.List; @@ -85,8 +84,7 @@ protected void reduceLifetimes(String tier) public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { final CoordinatorStats stats = new CoordinatorStats(); - final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); - final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp); + final BalancerStrategy strategy = params.getBalancerStrategy(); final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(); for (Map.Entry> entry : diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index b4ba58e5e544..a19362267e69 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -33,7 +33,6 @@ import io.druid.server.coordinator.ReplicationThrottler; import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; -import org.joda.time.DateTime; import java.util.List; import java.util.Map; @@ -72,8 +71,7 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim } final List serverHolderList = Lists.newArrayList(serverQueue); - final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); - final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp); + final BalancerStrategy strategy = params.getBalancerStrategy(); if (availableSegments.contains(segment)) { CoordinatorStats assignStats = assign( params.getReplicationManager(), diff --git a/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java b/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java index 4974fb563aa0..cec23cc5dfc7 100644 --- a/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java +++ b/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java @@ -21,8 +21,8 @@ import com.carrotsearch.junitbenchmarks.AbstractBenchmark; import com.carrotsearch.junitbenchmarks.BenchmarkOptions; +import com.google.common.util.concurrent.MoreExecutors; import io.druid.timeline.DataSegment; -import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -33,29 +33,32 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executors; @Ignore @RunWith(Parameterized.class) public class CostBalancerStrategyBenchmark extends AbstractBenchmark { @Parameterized.Parameters - public static List factoryClasses() + public static List factoryClasses() { return Arrays.asList( - (CostBalancerStrategyFactory[]) Arrays.asList( - new CostBalancerStrategyFactory(1) + (CostBalancerStrategy[]) Arrays.asList( + new CostBalancerStrategy(MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1))) ).toArray(), - (CostBalancerStrategyFactory[]) Arrays.asList( - new CostBalancerStrategyFactory(4) + (CostBalancerStrategy[]) Arrays.asList( + new CostBalancerStrategy(MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(4))) ).toArray() ); } private final CostBalancerStrategy strategy; - public CostBalancerStrategyBenchmark(CostBalancerStrategyFactory factory) + public CostBalancerStrategyBenchmark(CostBalancerStrategy costBalancerStrategy) { - this.strategy = factory.createBalancerStrategy(DateTime.now()); + this.strategy = costBalancerStrategy; } private static List serverHolderList; diff --git a/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java b/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java new file mode 100644 index 000000000000..c04ab11daefe --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.ImmutableDruidDataSource; +import io.druid.client.ImmutableDruidServer; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; + +public class DiskNormalizedCostBalancerStrategyTest +{ + private static final Interval day = new Interval("2015-01-01T00/2015-01-01T01"); + + /** + * Create Druid cluster with serverCount servers having maxSegments segments each, and 1 server with 98 segment + * Cost Balancer Strategy should assign the next segment to the server with less segments. + */ + public static List setupDummyCluster(int serverCount, int maxSegments) + { + List serverHolderList = Lists.newArrayList(); + // Create 10 servers with current size being 3K & max size being 10K + // Each having having 100 segments + for (int i = 0; i < serverCount; i++) { + LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); + Map segments = Maps.newHashMap(); + for (int j = 0; j < maxSegments; j++) { + DataSegment segment = getSegment(j); + segments.put(segment.getIdentifier(), segment); + } + + serverHolderList.add( + new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "hot", "hot", 1), + 3000L, + ImmutableMap.of("DUMMY", EasyMock.createMock(ImmutableDruidDataSource.class)), + ImmutableMap.copyOf(segments) + ), + fromPeon + )); + } + + // The best server to be available for next segment assignment has greater max Size + LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); + ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class); + EasyMock.expect(druidServer.getName()).andReturn("BEST_SERVER").anyTimes(); + EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes(); + EasyMock.expect(druidServer.getMaxSize()).andReturn(100000000L).anyTimes(); + + EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + Map segments = Maps.newHashMap(); + for (int j = 0; j < maxSegments; j++) { + DataSegment segment = getSegment(j); + segments.put(segment.getIdentifier(), segment); + EasyMock.expect(druidServer.getSegment(segment.getIdentifier())).andReturn(segment).anyTimes(); + } + EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes(); + + EasyMock.replay(druidServer); + serverHolderList.add(new ServerHolder(druidServer, fromPeon)); + return serverHolderList; + } + + /** + * Returns segment with dummy id and size 100 + * + * @param index + * + * @return segment + */ + public static DataSegment getSegment(int index) + { + return getSegment(index, "DUMMY", day); + } + + public static DataSegment getSegment(int index, String dataSource, Interval interval) + { + // Not using EasyMock as it hampers the performance of multithreads. + DataSegment segment = new DataSegment( + dataSource, interval, String.valueOf(index), Maps.newConcurrentMap(), + Lists.newArrayList(), Lists.newArrayList(), null, 0, index * 100L + ); + return segment; + } + + @Test + public void testNormalizedCostBalancerMultiThreadedStrategy() throws InterruptedException + { + List serverHolderList = setupDummyCluster(10, 20); + DataSegment segment = getSegment(1000); + + BalancerStrategy strategy = new DiskNormalizedCostBalancerStrategy( + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4)) + ); + ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); + Assert.assertNotNull("Should be able to find a place for new segment!!", holder); + Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); + } + + @Test + public void testNormalizedCostBalancerSingleThreadStrategy() throws InterruptedException + { + List serverHolderList = setupDummyCluster(10, 20); + DataSegment segment = getSegment(1000); + + BalancerStrategy strategy = new DiskNormalizedCostBalancerStrategy( + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)) + ); + ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); + Assert.assertNotNull("Should be able to find a place for new segment!!", holder); + Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java index adab7c4da97a..29ca9b4d33c0 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java @@ -23,6 +23,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.ImmutableDruidServer; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; @@ -38,6 +40,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executors; /** */ @@ -167,6 +170,10 @@ public void testMoveToEmptyServerBalancer() throws IOException LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); LoadQueuePeonTester toPeon = new LoadQueuePeonTester(); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder() @@ -198,19 +205,17 @@ public void testMoveToEmptyServerBalancer() throws IOException MAX_SEGMENTS_TO_MOVE ).build() ) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size()); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } - - @Test public void testRun1() throws IOException { @@ -244,6 +249,11 @@ public void testRun1() throws IOException EasyMock.expectLastCall().anyTimes(); EasyMock.replay(coordinator); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); LoadQueuePeonTester toPeon = new LoadQueuePeonTester(); DruidCoordinatorRuntimeParams params = @@ -275,13 +285,13 @@ public void testRun1() throws IOException new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .build() ) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } @@ -335,6 +345,11 @@ public void testRun2() throws IOException LoadQueuePeonTester peon3 = new LoadQueuePeonTester(); LoadQueuePeonTester peon4 = new LoadQueuePeonTester(); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( @@ -371,13 +386,13 @@ public void testRun2() throws IOException MAX_SEGMENTS_TO_MOVE ).build() ) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 273a77e39135..41fada88accf 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -25,6 +25,8 @@ import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEventBuilder; @@ -50,6 +52,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Executors; /** */ @@ -179,13 +182,18 @@ public void testRunThreeTiersOneReplicant() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build()) .build(); @@ -199,8 +207,8 @@ public void testRunThreeTiersOneReplicant() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } /** @@ -276,13 +284,18 @@ public void testRunTwoTiersTwoReplicants() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -294,8 +307,8 @@ public void testRunTwoTiersTwoReplicants() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } /** @@ -367,13 +380,18 @@ public void testRunTwoTiersWithExistingSegments() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -385,8 +403,8 @@ public void testRunTwoTiersWithExistingSegments() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } @Test @@ -432,6 +450,11 @@ public void testRunTwoTiersTierDoesNotExist() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withEmitter(emitter) @@ -439,15 +462,15 @@ public void testRunTwoTiersTierDoesNotExist() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); ruleRunner.run(params); + exec.shutdown(); EasyMock.verify(emitter); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } @Test @@ -554,13 +577,18 @@ public void testDropRemove() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -569,8 +597,8 @@ public void testDropRemove() throws Exception Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + exec.shutdown(); EasyMock.verify(coordinator); - params.getBalancerStrategyFactory().close(); } @Test @@ -633,13 +661,18 @@ public void testDropTooManyInSameTier() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -649,8 +682,8 @@ public void testDropTooManyInSameTier() throws Exception Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1); Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } @Test @@ -719,13 +752,18 @@ public void testDropTooManyInDifferentTiers() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -735,8 +773,8 @@ public void testDropTooManyInDifferentTiers() throws Exception Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1); Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } @Test @@ -801,13 +839,18 @@ public void testDontDropInDifferentTiers() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -817,8 +860,8 @@ public void testDontDropInDifferentTiers() throws Exception Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null); Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } @Test @@ -896,13 +939,18 @@ public void testDropServerActuallyServesSegment() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -911,9 +959,9 @@ public void testDropServerActuallyServesSegment() throws Exception Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1); + exec.shutdown(); EasyMock.verify(mockPeon); EasyMock.verify(anotherMockPeon); - params.getBalancerStrategyFactory().close(); } /** @@ -971,13 +1019,18 @@ public void testReplicantThrottle() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -999,7 +1052,6 @@ public void testReplicantThrottle() throws Exception 1, 0 ); - afterParams.getBalancerStrategyFactory().close(); afterParams = ruleRunner.run( new DruidCoordinatorRuntimeParams.Builder() @@ -1007,7 +1059,7 @@ public void testReplicantThrottle() throws Exception .withEmitter(emitter) .withAvailableSegments(Arrays.asList(overFlowSegment)) .withDatabaseRuleManager(databaseRuleManager) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .build() @@ -1019,7 +1071,7 @@ public void testReplicantThrottle() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); EasyMock.verify(mockPeon); - afterParams.getBalancerStrategyFactory().close(); + exec.shutdown(); } /** @@ -1096,12 +1148,17 @@ public void testReplicantThrottleAcrossTiers() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .build(); @@ -1116,7 +1173,7 @@ public void testReplicantThrottleAcrossTiers() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } @Test @@ -1193,13 +1250,18 @@ public void testDropReplicantThrottle() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(longerAvailableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -1208,7 +1270,7 @@ public void testDropReplicantThrottle() throws Exception Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } @Test @@ -1274,13 +1336,18 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build()) .build(); @@ -1298,7 +1365,7 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() throws Exception Assert.assertEquals(availableSegments, afterParams.getAvailableSegments()); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } private void mockCoordinator() diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index f329d45ffdcd..f9d8cadb0a28 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -189,7 +189,8 @@ public void unannounce(DruidNode node) }, druidNode, loadManagementPeons, - null + null, + new CostBalancerStrategyFactory() ); } diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 6d9e53beca4a..2d949ea25dd6 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -26,12 +26,15 @@ import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.logger.Logger; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.LoggingEmitter; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.DruidServer; import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.CostBalancerStrategyFactory; import io.druid.server.coordinator.DruidCluster; @@ -53,6 +56,7 @@ import java.util.Arrays; import java.util.Map; +import java.util.concurrent.Executors; /** */ @@ -190,14 +194,18 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ); - CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) - .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment @@ -205,7 +213,7 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1); Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 2); - costBalancerStrategyFactory.close(); + exec.shutdown(); } @Test @@ -296,14 +304,18 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ); - CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) - .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment @@ -311,7 +323,7 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1); - costBalancerStrategyFactory.close(); + exec.shutdown(); } @Test @@ -381,21 +393,26 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ) ); -CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); + + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .withReplicationManager(throttler) - .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment ); Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1); - costBalancerStrategyFactory.close(); + exec.shutdown(); } @Test @@ -481,7 +498,11 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ) ); - CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); + + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); CoordinatorStats stats = rule.run( null, @@ -489,13 +510,13 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) - .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment ); Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); - costBalancerStrategyFactory.close(); + exec.shutdown(); } } diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 549106652155..34da00216c26 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -51,6 +51,7 @@ import io.druid.metadata.MetadataStorage; import io.druid.metadata.MetadataStorageProvider; import io.druid.server.audit.AuditManagerProvider; +import io.druid.server.coordinator.BalancerStrategyFactory; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.LoadQueueTaskMaster; @@ -128,6 +129,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class); JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class); JsonConfigProvider.bind(binder, "druid.manager.lookups", LookupCoordinatorManagerConfig.class); + JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class); binder.bind(RedirectFilter.class).in(LazySingleton.class); binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class);