diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 806ba043204f..a40676144517 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; @@ -149,6 +150,9 @@ public class DruidCoordinator private volatile boolean started = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; + private int cachedBalancerThreadNumber; + private ListeningExecutorService balancerExec; + @Inject public DruidCoordinator( DruidCoordinatorConfig config, @@ -483,6 +487,18 @@ public void moveSegment( } } + @VisibleForTesting + public int getCachedBalancerThreadNumber() + { + return cachedBalancerThreadNumber; + } + + @VisibleForTesting + public ListeningExecutorService getBalancerExec() + { + return balancerExec; + } + @LifecycleStart public void start() { @@ -524,6 +540,10 @@ public void stop() started = false; exec.shutdownNow(); + + if (balancerExec != null) { + balancerExec.shutdownNow(); + } } } @@ -612,6 +632,11 @@ private void stopBeingLeader() lookupCoordinatorManager.stop(); metadataRuleManager.stop(); segmentsMetadataManager.stopPollingDatabasePeriodically(); + + if (balancerExec != null) { + balancerExec.shutdownNow(); + balancerExec = null; + } } } @@ -647,22 +672,52 @@ private List makeCompactSegmentsDuty() return ImmutableList.of(compactSegments); } - private class DutiesRunnable implements Runnable + @VisibleForTesting + protected class DutiesRunnable implements Runnable { private final long startTimeNanos = System.nanoTime(); private final List duties; private final int startingLeaderCounter; - private DutiesRunnable(List duties, final int startingLeaderCounter) + protected DutiesRunnable(List duties, final int startingLeaderCounter) { this.duties = duties; this.startingLeaderCounter = startingLeaderCounter; } + @VisibleForTesting + protected void initBalancerExecutor() + { + final int currentNumber = getDynamicConfigs().getBalancerComputeThreads(); + final String threadNameFormat = "coordinator-cost-balancer-%s"; + // fist time initialization + if (balancerExec == null) { + balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded( + currentNumber, + threadNameFormat + )); + cachedBalancerThreadNumber = currentNumber; + return; + } + + if (cachedBalancerThreadNumber != currentNumber) { + log.info( + "balancerComputeThreads has been changed from [%s] to [%s], recreating the thread pool.", + cachedBalancerThreadNumber, + currentNumber + ); + balancerExec.shutdownNow(); + balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded( + currentNumber, + threadNameFormat + )); + cachedBalancerThreadNumber = currentNumber; + } + } + @Override public void run() { - ListeningExecutorService balancerExec = null; try { synchronized (lock) { if (!coordLeaderSelector.isLeader()) { @@ -684,10 +739,7 @@ public void run() } } - balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded( - getDynamicConfigs().getBalancerComputeThreads(), - "coordinator-cost-balancer-%s" - )); + initBalancerExecutor(); BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec); // Do coordinator stuff. @@ -733,11 +785,6 @@ public void run() catch (Exception e) { log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit(); } - finally { - if (balancerExec != null) { - balancerExec.shutdownNow(); - } - } } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 51308cbd8fa9..9d1756e27de7 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListeningExecutorService; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.curator.framework.CuratorFramework; @@ -665,6 +666,74 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith EasyMock.verify(metadataRuleManager); } + @Test + public void testBalancerThreadNumber() + { + CoordinatorDynamicConfig dynamicConfig = EasyMock.createNiceMock(CoordinatorDynamicConfig.class); + EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2); + EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once(); + + JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class); + EasyMock.expect( + configManager.watch( + EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY), + EasyMock.anyObject(Class.class), + EasyMock.anyObject() + ) + ).andReturn(new AtomicReference(dynamicConfig)).anyTimes(); + + ScheduledExecutorFactory scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); + EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory); + + DruidCoordinator c = new DruidCoordinator( + null, + null, + configManager, + null, + null, + null, + null, + null, + scheduledExecutorFactory, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0); + // before initialization + Assert.assertEquals(0, c.getCachedBalancerThreadNumber()); + Assert.assertNull(c.getBalancerExec()); + + // first initialization + duty.initBalancerExecutor(); + System.out.println("c.getCachedBalancerThreadNumber(): " + c.getCachedBalancerThreadNumber()); + Assert.assertEquals(5, c.getCachedBalancerThreadNumber()); + ListeningExecutorService firstExec = c.getBalancerExec(); + Assert.assertNotNull(firstExec); + + // second initialization, expect no changes as cachedBalancerThreadNumber is not changed + duty.initBalancerExecutor(); + Assert.assertEquals(5, c.getCachedBalancerThreadNumber()); + ListeningExecutorService secondExec = c.getBalancerExec(); + Assert.assertNotNull(secondExec); + Assert.assertTrue(firstExec == secondExec); + + // third initialization, expect executor recreated as cachedBalancerThreadNumber is changed to 10 + duty.initBalancerExecutor(); + Assert.assertEquals(10, c.getCachedBalancerThreadNumber()); + ListeningExecutorService thirdExec = c.getBalancerExec(); + Assert.assertNotNull(thirdExec); + Assert.assertFalse(secondExec == thirdExec); + Assert.assertFalse(firstExec == thirdExec); + } + private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount, PathChildrenCache pathChildrenCache, Map segments,