From ca4e35ea027c359bdb81277ab1fe7c1062cce8b4 Mon Sep 17 00:00:00 2001 From: Niketh Sabbineni Date: Tue, 1 Nov 2016 14:56:51 -0700 Subject: [PATCH 1/5] Normalized Cost Balancer --- .../coordinator/BalancerStrategyFactory.java | 16 +- .../CostBalancerStrategyFactory.java | 20 +-- .../DiskNormalizedCostBalancerStrategy.java | 69 +++++++++ ...NormalizedCostBalancerStrategyFactory.java | 30 ++++ .../server/coordinator/DruidCoordinator.java | 53 ++++--- .../DruidCoordinatorRuntimeParams.java | 26 ++-- .../RandomBalancerStrategyFactory.java | 9 +- .../helper/DruidCoordinatorBalancer.java | 3 +- .../server/coordinator/rules/LoadRule.java | 3 +- .../CostBalancerStrategyBenchmark.java | 18 ++- ...iskNormalizedCostBalancerStrategyTest.java | 142 ++++++++++++++++++ .../DruidCoordinatorBalancerTest.java | 31 +++- .../DruidCoordinatorRuleRunnerTest.java | 123 +++++++++++---- .../coordinator/DruidCoordinatorTest.java | 3 +- .../coordinator/rules/LoadRuleTest.java | 45 ++++-- .../java/io/druid/cli/CliCoordinator.java | 2 + 16 files changed, 469 insertions(+), 124 deletions(-) create mode 100644 server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java create mode 100644 server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyFactory.java create mode 100644 server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java diff --git a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java index be11848dc0be..4c73297d0447 100644 --- a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java @@ -18,11 +18,17 @@ */ package io.druid.server.coordinator; -import org.joda.time.DateTime; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.util.concurrent.ListeningExecutorService; -import java.io.Closeable; - -public interface BalancerStrategyFactory extends Closeable +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = CostBalancerStrategyFactory.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "normalizedCostBalancer", value = DiskNormalizedCostBalancerStrategyFactory.class), + @JsonSubTypes.Type(name = "costBalancer", value = CostBalancerStrategyFactory.class), + @JsonSubTypes.Type(name = "randomBalancer", value = RandomBalancerStrategyFactory.class), +}) +public interface BalancerStrategyFactory { - public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp); + public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec); } diff --git a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java index e853302ae31c..016f6db0e1c9 100644 --- a/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/CostBalancerStrategyFactory.java @@ -19,30 +19,12 @@ package io.druid.server.coordinator; import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import org.joda.time.DateTime; - -import java.io.IOException; -import java.util.concurrent.Executors; public class CostBalancerStrategyFactory implements BalancerStrategyFactory { - private final ListeningExecutorService exec; - - public CostBalancerStrategyFactory(int costBalancerStrategyThreadCount) - { - this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(costBalancerStrategyThreadCount)); - } - @Override - public CostBalancerStrategy createBalancerStrategy(DateTime referenceTimestamp) + public CostBalancerStrategy createBalancerStrategy(ListeningExecutorService exec) { return new CostBalancerStrategy(exec); } - - @Override - public void close() throws IOException - { - exec.shutdownNow(); - } } diff --git a/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java new file mode 100644 index 000000000000..039b82dd404b --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java @@ -0,0 +1,69 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.emitter.EmittingLogger; +import io.druid.java.util.common.Pair; +import io.druid.timeline.DataSegment; +import org.apache.commons.math3.util.FastMath; +import org.joda.time.Interval; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; + +public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy +{ + public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec) + { + super(exec); + } + + @Override + protected double computeCost( + final DataSegment proposalSegment, final ServerHolder server, final boolean includeCurrentServer + ) + { + double cost = super.computeCost(proposalSegment, server, includeCurrentServer); + + if(cost == Double.POSITIVE_INFINITY){ + return cost; + } + + int nSegments = 1; + if(server.getServer().getSegments().size() > 0) + { + nSegments = server.getServer().getSegments().size(); + } + + double normalizedCost = cost/nSegments; + double usageRatio = (double)server.getServer().getCurrSize()/(double)server.getServer().getMaxSize(); + + return normalizedCost*usageRatio; + } +} + diff --git a/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyFactory.java new file mode 100644 index 000000000000..6cbb9ecaf9a9 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.server.coordinator; + +import com.google.common.util.concurrent.ListeningExecutorService; + +public class DiskNormalizedCostBalancerStrategyFactory implements BalancerStrategyFactory +{ + @Override + public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) + { + return new DiskNormalizedCostBalancerStrategy(exec); + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index f971066a8af0..8d2625db84f8 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -28,6 +28,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; @@ -84,6 +86,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -129,7 +132,7 @@ public Interval apply(DataSegment segment) private volatile int leaderCounter = 0; private volatile boolean leader = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; - + private final BalancerStrategyFactory factory; @Inject public DruidCoordinator( @@ -146,7 +149,8 @@ public DruidCoordinator( LoadQueueTaskMaster taskMaster, ServiceAnnouncer serviceAnnouncer, @Self DruidNode self, - @CoordinatorIndexingServiceHelper Set indexingServiceHelpers + @CoordinatorIndexingServiceHelper Set indexingServiceHelpers, + BalancerStrategyFactory factory ) { this( @@ -164,7 +168,8 @@ public DruidCoordinator( serviceAnnouncer, self, Maps.newConcurrentMap(), - indexingServiceHelpers + indexingServiceHelpers, + factory ); } @@ -183,7 +188,8 @@ public DruidCoordinator( ServiceAnnouncer serviceAnnouncer, DruidNode self, ConcurrentMap loadQueuePeonMap, - Set indexingServiceHelpers + Set indexingServiceHelpers, + BalancerStrategyFactory factory ) { this.config = config; @@ -205,6 +211,7 @@ public DruidCoordinator( this.leaderLatch = new AtomicReference<>(null); this.loadManagementPeons = loadQueuePeonMap; + this.factory = factory; } public boolean isLeader() @@ -664,6 +671,7 @@ protected CoordinatorRunnable(List helpers, final int st @Override public void run() { + ListeningExecutorService balancerExec = null; try { synchronized (lock) { final LeaderLatch latch = leaderLatch.get(); @@ -686,27 +694,32 @@ public void run() } } - try (BalancerStrategyFactory factory = - new CostBalancerStrategyFactory(getDynamicConfigs().getBalancerComputeThreads())) { - // Do coordinator stuff. - DruidCoordinatorRuntimeParams params = - DruidCoordinatorRuntimeParams.newBuilder() - .withStartTime(startTime) - .withDatasources(metadataSegmentManager.getInventory()) - .withDynamicConfigs(getDynamicConfigs()) - .withEmitter(emitter) - .withBalancerStrategyFactory(factory) - .build(); - for (DruidCoordinatorHelper helper : helpers) { - // Don't read state and run state in the same helper otherwise racy conditions may exist - if (leader && startingLeaderCounter == leaderCounter) { - params = helper.run(params); - } + balancerExec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(getDynamicConfigs().getBalancerComputeThreads())); + BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec); + + // Do coordinator stuff. + DruidCoordinatorRuntimeParams params = + DruidCoordinatorRuntimeParams.newBuilder() + .withStartTime(startTime) + .withDatasources(metadataSegmentManager.getInventory()) + .withDynamicConfigs(getDynamicConfigs()) + .withEmitter(emitter) + .withBalancerStrategy(balancerStrategy) + .build(); + for (DruidCoordinatorHelper helper : helpers) { + // Don't read state and run state in the same helper otherwise racy conditions may exist + if (leader && startingLeaderCounter == leaderCounter) { + params = helper.run(params); } } } catch (Exception e) { log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit(); + } finally { + if(balancerExec != null){ + balancerExec.shutdownNow(); + } } } } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java index 90f473197351..daea2cb31dd9 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorRuntimeParams.java @@ -48,7 +48,7 @@ public class DruidCoordinatorRuntimeParams private final CoordinatorDynamicConfig coordinatorDynamicConfig; private final CoordinatorStats stats; private final DateTime balancerReferenceTimestamp; - private final BalancerStrategyFactory strategyFactory; + private final BalancerStrategy balancerStrategy; public DruidCoordinatorRuntimeParams( long startTime, @@ -63,7 +63,7 @@ public DruidCoordinatorRuntimeParams( CoordinatorDynamicConfig coordinatorDynamicConfig, CoordinatorStats stats, DateTime balancerReferenceTimestamp, - BalancerStrategyFactory strategyFactory + BalancerStrategy balancerStrategy ) { this.startTime = startTime; @@ -78,7 +78,7 @@ public DruidCoordinatorRuntimeParams( this.coordinatorDynamicConfig = coordinatorDynamicConfig; this.stats = stats; this.balancerReferenceTimestamp = balancerReferenceTimestamp; - this.strategyFactory = strategyFactory; + this.balancerStrategy = balancerStrategy; } public long getStartTime() @@ -141,9 +141,9 @@ public DateTime getBalancerReferenceTimestamp() return balancerReferenceTimestamp; } - public BalancerStrategyFactory getBalancerStrategyFactory() + public BalancerStrategy getBalancerStrategy() { - return strategyFactory; + return balancerStrategy; } public boolean hasDeletionWaitTimeElapsed() @@ -171,7 +171,7 @@ public Builder buildFromExisting() coordinatorDynamicConfig, stats, balancerReferenceTimestamp, - strategyFactory + balancerStrategy ); } @@ -190,7 +190,7 @@ public Builder buildFromExistingWithoutAvailableSegments() coordinatorDynamicConfig, stats, balancerReferenceTimestamp, - strategyFactory + balancerStrategy ); } @@ -208,7 +208,7 @@ public static class Builder private CoordinatorDynamicConfig coordinatorDynamicConfig; private CoordinatorStats stats; private DateTime balancerReferenceTimestamp; - private BalancerStrategyFactory strategyFactory; + private BalancerStrategy balancerStrategy; Builder() { @@ -239,7 +239,7 @@ public static class Builder CoordinatorDynamicConfig coordinatorDynamicConfig, CoordinatorStats stats, DateTime balancerReferenceTimestamp, - BalancerStrategyFactory strategyFactory + BalancerStrategy balancerStrategy ) { this.startTime = startTime; @@ -254,7 +254,7 @@ public static class Builder this.coordinatorDynamicConfig = coordinatorDynamicConfig; this.stats = stats; this.balancerReferenceTimestamp = balancerReferenceTimestamp; - this.strategyFactory=strategyFactory; + this.balancerStrategy=balancerStrategy; } public DruidCoordinatorRuntimeParams build() @@ -272,7 +272,7 @@ public DruidCoordinatorRuntimeParams build() coordinatorDynamicConfig, stats, balancerReferenceTimestamp, - strategyFactory + balancerStrategy ); } @@ -348,9 +348,9 @@ public Builder withBalancerReferenceTimestamp(DateTime balancerReferenceTimestam return this; } - public Builder withBalancerStrategyFactory(BalancerStrategyFactory strategyFactory) + public Builder withBalancerStrategy(BalancerStrategy balancerStrategy) { - this.strategyFactory=strategyFactory; + this.balancerStrategy=balancerStrategy; return this; } } diff --git a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java index 773ff16efd4f..a1344fb11259 100644 --- a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java @@ -18,6 +18,7 @@ */ package io.druid.server.coordinator; +import com.google.common.util.concurrent.ListeningExecutorService; import org.joda.time.DateTime; import java.io.IOException; @@ -25,14 +26,8 @@ public class RandomBalancerStrategyFactory implements BalancerStrategyFactory { @Override - public BalancerStrategy createBalancerStrategy(DateTime referenceTimestamp) + public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec) { return new RandomBalancerStrategy(); } - - @Override - public void close() throws IOException - { - // No-op - } } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index fda4cf634273..0dbbc0d90c2a 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -85,8 +85,7 @@ protected void reduceLifetimes(String tier) public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { final CoordinatorStats stats = new CoordinatorStats(); - final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); - final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp); + final BalancerStrategy strategy = params.getBalancerStrategy(); final int maxSegmentsToMove = params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(); for (Map.Entry> entry : diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index b4ba58e5e544..4b5cfdac498b 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -72,8 +72,7 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim } final List serverHolderList = Lists.newArrayList(serverQueue); - final DateTime referenceTimestamp = params.getBalancerReferenceTimestamp(); - final BalancerStrategy strategy = params.getBalancerStrategyFactory().createBalancerStrategy(referenceTimestamp); + final BalancerStrategy strategy = params.getBalancerStrategy(); if (availableSegments.contains(segment)) { CoordinatorStats assignStats = assign( params.getReplicationManager(), diff --git a/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java b/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java index 4974fb563aa0..07226e3ef9a4 100644 --- a/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java +++ b/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java @@ -21,6 +21,7 @@ import com.carrotsearch.junitbenchmarks.AbstractBenchmark; import com.carrotsearch.junitbenchmarks.BenchmarkOptions; +import com.google.common.util.concurrent.MoreExecutors; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -33,29 +34,32 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.Executors; @Ignore @RunWith(Parameterized.class) public class CostBalancerStrategyBenchmark extends AbstractBenchmark { @Parameterized.Parameters - public static List factoryClasses() + public static List factoryClasses() { return Arrays.asList( - (CostBalancerStrategyFactory[]) Arrays.asList( - new CostBalancerStrategyFactory(1) + (CostBalancerStrategy[]) Arrays.asList( + new CostBalancerStrategy(MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1))) ).toArray(), - (CostBalancerStrategyFactory[]) Arrays.asList( - new CostBalancerStrategyFactory(4) + (CostBalancerStrategy[]) Arrays.asList( + new CostBalancerStrategy(MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(4))) ).toArray() ); } private final CostBalancerStrategy strategy; - public CostBalancerStrategyBenchmark(CostBalancerStrategyFactory factory) + public CostBalancerStrategyBenchmark(CostBalancerStrategy costBalancerStrategy) { - this.strategy = factory.createBalancerStrategy(DateTime.now()); + this.strategy = costBalancerStrategy; } private static List serverHolderList; diff --git a/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java b/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java new file mode 100644 index 000000000000..c04ab11daefe --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.MoreExecutors; +import io.druid.client.ImmutableDruidDataSource; +import io.druid.client.ImmutableDruidServer; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; + +public class DiskNormalizedCostBalancerStrategyTest +{ + private static final Interval day = new Interval("2015-01-01T00/2015-01-01T01"); + + /** + * Create Druid cluster with serverCount servers having maxSegments segments each, and 1 server with 98 segment + * Cost Balancer Strategy should assign the next segment to the server with less segments. + */ + public static List setupDummyCluster(int serverCount, int maxSegments) + { + List serverHolderList = Lists.newArrayList(); + // Create 10 servers with current size being 3K & max size being 10K + // Each having having 100 segments + for (int i = 0; i < serverCount; i++) { + LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); + Map segments = Maps.newHashMap(); + for (int j = 0; j < maxSegments; j++) { + DataSegment segment = getSegment(j); + segments.put(segment.getIdentifier(), segment); + } + + serverHolderList.add( + new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("DruidServer_Name_" + i, "localhost", 10000000L, "hot", "hot", 1), + 3000L, + ImmutableMap.of("DUMMY", EasyMock.createMock(ImmutableDruidDataSource.class)), + ImmutableMap.copyOf(segments) + ), + fromPeon + )); + } + + // The best server to be available for next segment assignment has greater max Size + LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); + ImmutableDruidServer druidServer = EasyMock.createMock(ImmutableDruidServer.class); + EasyMock.expect(druidServer.getName()).andReturn("BEST_SERVER").anyTimes(); + EasyMock.expect(druidServer.getCurrSize()).andReturn(3000L).anyTimes(); + EasyMock.expect(druidServer.getMaxSize()).andReturn(100000000L).anyTimes(); + + EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes(); + Map segments = Maps.newHashMap(); + for (int j = 0; j < maxSegments; j++) { + DataSegment segment = getSegment(j); + segments.put(segment.getIdentifier(), segment); + EasyMock.expect(druidServer.getSegment(segment.getIdentifier())).andReturn(segment).anyTimes(); + } + EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes(); + + EasyMock.replay(druidServer); + serverHolderList.add(new ServerHolder(druidServer, fromPeon)); + return serverHolderList; + } + + /** + * Returns segment with dummy id and size 100 + * + * @param index + * + * @return segment + */ + public static DataSegment getSegment(int index) + { + return getSegment(index, "DUMMY", day); + } + + public static DataSegment getSegment(int index, String dataSource, Interval interval) + { + // Not using EasyMock as it hampers the performance of multithreads. + DataSegment segment = new DataSegment( + dataSource, interval, String.valueOf(index), Maps.newConcurrentMap(), + Lists.newArrayList(), Lists.newArrayList(), null, 0, index * 100L + ); + return segment; + } + + @Test + public void testNormalizedCostBalancerMultiThreadedStrategy() throws InterruptedException + { + List serverHolderList = setupDummyCluster(10, 20); + DataSegment segment = getSegment(1000); + + BalancerStrategy strategy = new DiskNormalizedCostBalancerStrategy( + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4)) + ); + ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); + Assert.assertNotNull("Should be able to find a place for new segment!!", holder); + Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); + } + + @Test + public void testNormalizedCostBalancerSingleThreadStrategy() throws InterruptedException + { + List serverHolderList = setupDummyCluster(10, 20); + DataSegment segment = getSegment(1000); + + BalancerStrategy strategy = new DiskNormalizedCostBalancerStrategy( + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)) + ); + ServerHolder holder = strategy.findNewSegmentHomeReplicator(segment, serverHolderList); + Assert.assertNotNull("Should be able to find a place for new segment!!", holder); + Assert.assertEquals("Best Server should be BEST_SERVER", "BEST_SERVER", holder.getServer().getName()); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java index adab7c4da97a..29ca9b4d33c0 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java @@ -23,6 +23,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.ImmutableDruidServer; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; @@ -38,6 +40,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executors; /** */ @@ -167,6 +170,10 @@ public void testMoveToEmptyServerBalancer() throws IOException LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); LoadQueuePeonTester toPeon = new LoadQueuePeonTester(); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder() @@ -198,19 +205,17 @@ public void testMoveToEmptyServerBalancer() throws IOException MAX_SEGMENTS_TO_MOVE ).build() ) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() < segments.size()); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } - - @Test public void testRun1() throws IOException { @@ -244,6 +249,11 @@ public void testRun1() throws IOException EasyMock.expectLastCall().anyTimes(); EasyMock.replay(coordinator); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + LoadQueuePeonTester fromPeon = new LoadQueuePeonTester(); LoadQueuePeonTester toPeon = new LoadQueuePeonTester(); DruidCoordinatorRuntimeParams params = @@ -275,13 +285,13 @@ public void testRun1() throws IOException new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE) .build() ) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } @@ -335,6 +345,11 @@ public void testRun2() throws IOException LoadQueuePeonTester peon3 = new LoadQueuePeonTester(); LoadQueuePeonTester peon4 = new LoadQueuePeonTester(); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster( @@ -371,13 +386,13 @@ public void testRun2() throws IOException MAX_SEGMENTS_TO_MOVE ).build() ) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); params = new DruidCoordinatorBalancerTester(coordinator).run(params); Assert.assertTrue(params.getCoordinatorStats().getPerTierStats().get("movedCount").get("normal").get() > 0); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 273a77e39135..41fada88accf 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -25,6 +25,8 @@ import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEventBuilder; @@ -50,6 +52,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Executors; /** */ @@ -179,13 +182,18 @@ public void testRunThreeTiersOneReplicant() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build()) .build(); @@ -199,8 +207,8 @@ public void testRunThreeTiersOneReplicant() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } /** @@ -276,13 +284,18 @@ public void testRunTwoTiersTwoReplicants() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -294,8 +307,8 @@ public void testRunTwoTiersTwoReplicants() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } /** @@ -367,13 +380,18 @@ public void testRunTwoTiersWithExistingSegments() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -385,8 +403,8 @@ public void testRunTwoTiersWithExistingSegments() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedCount") == null); Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } @Test @@ -432,6 +450,11 @@ public void testRunTwoTiersTierDoesNotExist() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withEmitter(emitter) @@ -439,15 +462,15 @@ public void testRunTwoTiersTierDoesNotExist() throws Exception .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); ruleRunner.run(params); + exec.shutdown(); EasyMock.verify(emitter); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } @Test @@ -554,13 +577,18 @@ public void testDropRemove() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -569,8 +597,8 @@ public void testDropRemove() throws Exception Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + exec.shutdown(); EasyMock.verify(coordinator); - params.getBalancerStrategyFactory().close(); } @Test @@ -633,13 +661,18 @@ public void testDropTooManyInSameTier() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -649,8 +682,8 @@ public void testDropTooManyInSameTier() throws Exception Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1); Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } @Test @@ -719,13 +752,18 @@ public void testDropTooManyInDifferentTiers() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -735,8 +773,8 @@ public void testDropTooManyInDifferentTiers() throws Exception Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1); Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } @Test @@ -801,13 +839,18 @@ public void testDontDropInDifferentTiers() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -817,8 +860,8 @@ public void testDontDropInDifferentTiers() throws Exception Assert.assertTrue(stats.getPerTierStats().get("droppedCount") == null); Assert.assertTrue(stats.getGlobalStats().get("deletedCount").get() == 12); + exec.shutdown(); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); } @Test @@ -896,13 +939,18 @@ public void testDropServerActuallyServesSegment() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -911,9 +959,9 @@ public void testDropServerActuallyServesSegment() throws Exception Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 1); + exec.shutdown(); EasyMock.verify(mockPeon); EasyMock.verify(anotherMockPeon); - params.getBalancerStrategyFactory().close(); } /** @@ -971,13 +1019,18 @@ public void testReplicantThrottle() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -999,7 +1052,6 @@ public void testReplicantThrottle() throws Exception 1, 0 ); - afterParams.getBalancerStrategyFactory().close(); afterParams = ruleRunner.run( new DruidCoordinatorRuntimeParams.Builder() @@ -1007,7 +1059,7 @@ public void testReplicantThrottle() throws Exception .withEmitter(emitter) .withAvailableSegments(Arrays.asList(overFlowSegment)) .withDatabaseRuleManager(databaseRuleManager) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .build() @@ -1019,7 +1071,7 @@ public void testReplicantThrottle() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); EasyMock.verify(mockPeon); - afterParams.getBalancerStrategyFactory().close(); + exec.shutdown(); } /** @@ -1096,12 +1148,17 @@ public void testReplicantThrottleAcrossTiers() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .build(); @@ -1116,7 +1173,7 @@ public void testReplicantThrottleAcrossTiers() throws Exception Assert.assertTrue(stats.getPerTierStats().get("unassignedSize") == null); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } @Test @@ -1193,13 +1250,18 @@ public void testDropReplicantThrottle() throws Exception SegmentReplicantLookup segmentReplicantLookup = SegmentReplicantLookup.make(druidCluster); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMillisToWaitBeforeDeleting(0L).build()) .withAvailableSegments(longerAvailableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .build(); @@ -1208,7 +1270,7 @@ public void testDropReplicantThrottle() throws Exception Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("normal").get() == 24); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } @Test @@ -1274,13 +1336,18 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() throws Exception ) ); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder() .withDruidCluster(druidCluster) .withAvailableSegments(availableSegments) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) - .withBalancerStrategyFactory(new CostBalancerStrategyFactory(1)) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withDynamicConfigs(new CoordinatorDynamicConfig.Builder().withMaxSegmentsToMove(5).build()) .build(); @@ -1298,7 +1365,7 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() throws Exception Assert.assertEquals(availableSegments, afterParams.getAvailableSegments()); EasyMock.verify(mockPeon); - params.getBalancerStrategyFactory().close(); + exec.shutdown(); } private void mockCoordinator() diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index f329d45ffdcd..f9d8cadb0a28 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -189,7 +189,8 @@ public void unannounce(DruidNode node) }, druidNode, loadManagementPeons, - null + null, + new CostBalancerStrategyFactory() ); } diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 6d9e53beca4a..2d949ea25dd6 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -26,12 +26,15 @@ import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.logger.Logger; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.LoggingEmitter; import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.DruidServer; import io.druid.jackson.DefaultObjectMapper; +import io.druid.server.coordinator.BalancerStrategy; import io.druid.server.coordinator.CoordinatorStats; import io.druid.server.coordinator.CostBalancerStrategyFactory; import io.druid.server.coordinator.DruidCluster; @@ -53,6 +56,7 @@ import java.util.Arrays; import java.util.Map; +import java.util.concurrent.Executors; /** */ @@ -190,14 +194,18 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ); - CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) - .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment @@ -205,7 +213,7 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1); Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get(DruidServer.DEFAULT_TIER).get() == 2); - costBalancerStrategyFactory.close(); + exec.shutdown(); } @Test @@ -296,14 +304,18 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ); - CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) - .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment @@ -311,7 +323,7 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get(DruidServer.DEFAULT_TIER).get() == 1); - costBalancerStrategyFactory.close(); + exec.shutdown(); } @Test @@ -381,21 +393,26 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ) ); -CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); + + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); + CoordinatorStats stats = rule.run( null, DruidCoordinatorRuntimeParams.newBuilder() .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) .withReplicationManager(throttler) - .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment ); Assert.assertTrue(stats.getPerTierStats().get("assignedCount").get("hot").get() == 1); - costBalancerStrategyFactory.close(); + exec.shutdown(); } @Test @@ -481,7 +498,11 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) ) ) ); - CostBalancerStrategyFactory costBalancerStrategyFactory = new CostBalancerStrategyFactory(1); + + ListeningExecutorService exec = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1)); + BalancerStrategy balancerStrategy = + new CostBalancerStrategyFactory().createBalancerStrategy(exec); CoordinatorStats stats = rule.run( null, @@ -489,13 +510,13 @@ public boolean appliesTo(Interval interval, DateTime referenceTimestamp) .withDruidCluster(druidCluster) .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withReplicationManager(throttler) - .withBalancerStrategyFactory(costBalancerStrategyFactory) + .withBalancerStrategy(balancerStrategy) .withBalancerReferenceTimestamp(new DateTime("2013-01-01")) .withAvailableSegments(Arrays.asList(segment)).build(), segment ); Assert.assertTrue(stats.getPerTierStats().get("droppedCount").get("hot").get() == 1); - costBalancerStrategyFactory.close(); + exec.shutdown(); } } diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index 549106652155..34da00216c26 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -51,6 +51,7 @@ import io.druid.metadata.MetadataStorage; import io.druid.metadata.MetadataStorageProvider; import io.druid.server.audit.AuditManagerProvider; +import io.druid.server.coordinator.BalancerStrategyFactory; import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.LoadQueueTaskMaster; @@ -128,6 +129,7 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class); JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class); JsonConfigProvider.bind(binder, "druid.manager.lookups", LookupCoordinatorManagerConfig.class); + JsonConfigProvider.bind(binder, "druid.coordinator.balancer", BalancerStrategyFactory.class); binder.bind(RedirectFilter.class).in(LazySingleton.class); binder.bind(RedirectInfo.class).to(CoordinatorRedirectInfo.class).in(LazySingleton.class); From 35db214d09ef20ca8cb801c081a385a1b75b7d03 Mon Sep 17 00:00:00 2001 From: Niketh Sabbineni Date: Wed, 2 Nov 2016 11:18:59 -0700 Subject: [PATCH 2/5] Adding documentation and renaming to use diskNormalizedCostBalancer --- docs/content/configuration/coordinator.md | 1 + .../io/druid/server/coordinator/BalancerStrategyFactory.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/content/configuration/coordinator.md b/docs/content/configuration/coordinator.md index 9aac673c7f35..b5064d30130f 100644 --- a/docs/content/configuration/coordinator.md +++ b/docs/content/configuration/coordinator.md @@ -32,6 +32,7 @@ The coordinator node uses several of the global configs in [Configuration](../co |`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)| |`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)| |`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0| +|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use diskNormalizedCostBalancer to distribute segments among nodes so that the disks fill up uniformly.|costBalancer| ### Metadata Retrieval diff --git a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java index 4c73297d0447..dfa00fd1f928 100644 --- a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java @@ -24,7 +24,7 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = CostBalancerStrategyFactory.class) @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "normalizedCostBalancer", value = DiskNormalizedCostBalancerStrategyFactory.class), + @JsonSubTypes.Type(name = "diskNormalizedCostBalancer", value = DiskNormalizedCostBalancerStrategyFactory.class), @JsonSubTypes.Type(name = "costBalancer", value = CostBalancerStrategyFactory.class), @JsonSubTypes.Type(name = "randomBalancer", value = RandomBalancerStrategyFactory.class), }) From 010d640f5d7d41417e5ae6dcf9deacdada03c707 Mon Sep 17 00:00:00 2001 From: Niketh Sabbineni Date: Wed, 2 Nov 2016 11:37:12 -0700 Subject: [PATCH 3/5] Remove balancer from the strings --- docs/content/configuration/coordinator.md | 2 +- .../druid/server/coordinator/BalancerStrategyFactory.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/content/configuration/coordinator.md b/docs/content/configuration/coordinator.md index b5064d30130f..760c52bd2b51 100644 --- a/docs/content/configuration/coordinator.md +++ b/docs/content/configuration/coordinator.md @@ -32,7 +32,7 @@ The coordinator node uses several of the global configs in [Configuration](../co |`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)| |`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)| |`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0| -|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use diskNormalizedCostBalancer to distribute segments among nodes so that the disks fill up uniformly.|costBalancer| +|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use diskNormalized to distribute segments among nodes so that the disks fill up uniformly.|cost| ### Metadata Retrieval diff --git a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java index dfa00fd1f928..011b63f59088 100644 --- a/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/BalancerStrategyFactory.java @@ -24,9 +24,9 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = CostBalancerStrategyFactory.class) @JsonSubTypes(value = { - @JsonSubTypes.Type(name = "diskNormalizedCostBalancer", value = DiskNormalizedCostBalancerStrategyFactory.class), - @JsonSubTypes.Type(name = "costBalancer", value = CostBalancerStrategyFactory.class), - @JsonSubTypes.Type(name = "randomBalancer", value = RandomBalancerStrategyFactory.class), + @JsonSubTypes.Type(name = "diskNormalized", value = DiskNormalizedCostBalancerStrategyFactory.class), + @JsonSubTypes.Type(name = "cost", value = CostBalancerStrategyFactory.class), + @JsonSubTypes.Type(name = "random", value = RandomBalancerStrategyFactory.class), }) public interface BalancerStrategyFactory { From d08ecbcd08dfcd2017cb1a7a494e4568b399d671 Mon Sep 17 00:00:00 2001 From: Niketh Sabbineni Date: Wed, 23 Nov 2016 10:18:38 -0800 Subject: [PATCH 4/5] Update docs and include random cost balancer --- docs/content/configuration/coordinator.md | 2 +- .../coordinator/DiskNormalizedCostBalancerStrategy.java | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/content/configuration/coordinator.md b/docs/content/configuration/coordinator.md index 760c52bd2b51..5ed2c2c70867 100644 --- a/docs/content/configuration/coordinator.md +++ b/docs/content/configuration/coordinator.md @@ -32,7 +32,7 @@ The coordinator node uses several of the global configs in [Configuration](../co |`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)| |`druid.coordinator.kill.durationToRetain`| Do not kill segments in last `durationToRetain`, must be greater or equal to 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|PT-1S (-1 seconds)| |`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0| -|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use diskNormalized to distribute segments among nodes so that the disks fill up uniformly.|cost| +|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`| ### Metadata Retrieval diff --git a/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java index 039b82dd404b..cb941b8703ad 100644 --- a/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java @@ -43,6 +43,11 @@ public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec) super(exec); } + /** + * Averages the cost obtained from CostBalancerStrategy. Also the costs are weighted according to their usage ratios. + * This ensures that all the hosts will have the same % disk utilization. + * + */ @Override protected double computeCost( final DataSegment proposalSegment, final ServerHolder server, final boolean includeCurrentServer From 3024aa0331becb4e6e7e7a01fece3c5eccf1e0c7 Mon Sep 17 00:00:00 2001 From: Niketh Sabbineni Date: Wed, 23 Nov 2016 11:33:30 -0800 Subject: [PATCH 5/5] Fix checkstyle issues --- .../DiskNormalizedCostBalancerStrategy.java | 15 --------------- .../RandomBalancerStrategyFactory.java | 3 --- .../helper/DruidCoordinatorBalancer.java | 1 - .../druid/server/coordinator/rules/LoadRule.java | 1 - .../CostBalancerStrategyBenchmark.java | 1 - 5 files changed, 21 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java index cb941b8703ad..77203a06cc32 100644 --- a/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java +++ b/server/src/main/java/io/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java @@ -19,22 +19,8 @@ package io.druid.server.coordinator; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.metamx.emitter.EmittingLogger; -import io.druid.java.util.common.Pair; import io.druid.timeline.DataSegment; -import org.apache.commons.math3.util.FastMath; -import org.joda.time.Interval; - -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy { @@ -46,7 +32,6 @@ public DiskNormalizedCostBalancerStrategy(ListeningExecutorService exec) /** * Averages the cost obtained from CostBalancerStrategy. Also the costs are weighted according to their usage ratios. * This ensures that all the hosts will have the same % disk utilization. - * */ @Override protected double computeCost( diff --git a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java index a1344fb11259..ca7639db2a05 100644 --- a/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java +++ b/server/src/main/java/io/druid/server/coordinator/RandomBalancerStrategyFactory.java @@ -19,9 +19,6 @@ package io.druid.server.coordinator; import com.google.common.util.concurrent.ListeningExecutorService; -import org.joda.time.DateTime; - -import java.io.IOException; public class RandomBalancerStrategyFactory implements BalancerStrategyFactory { diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java index 0dbbc0d90c2a..67e750dd927f 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java @@ -34,7 +34,6 @@ import io.druid.server.coordinator.LoadQueuePeon; import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; -import org.joda.time.DateTime; import java.util.Comparator; import java.util.List; diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index 4b5cfdac498b..a19362267e69 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -33,7 +33,6 @@ import io.druid.server.coordinator.ReplicationThrottler; import io.druid.server.coordinator.ServerHolder; import io.druid.timeline.DataSegment; -import org.joda.time.DateTime; import java.util.List; import java.util.Map; diff --git a/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java b/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java index 07226e3ef9a4..cec23cc5dfc7 100644 --- a/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java +++ b/server/src/test/java/io/druid/server/coordinator/CostBalancerStrategyBenchmark.java @@ -23,7 +23,6 @@ import com.carrotsearch.junitbenchmarks.BenchmarkOptions; import com.google.common.util.concurrent.MoreExecutors; import io.druid.timeline.DataSegment; -import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.AfterClass; import org.junit.BeforeClass;