From babe78eafee7a70c996eeecab946a933b4746a07 Mon Sep 17 00:00:00 2001 From: Arvin Zheng Date: Thu, 13 Aug 2020 23:58:47 -0700 Subject: [PATCH 1/5] recreate the balancer executor only when needed --- .../server/coordinator/DruidCoordinator.java | 66 +++++++++++++++---- .../coordinator/DruidCoordinatorTest.java | 38 +++++++++++ 2 files changed, 92 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 806ba043204f..5329e88fc946 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -19,6 +19,7 @@ package org.apache.druid.server.coordinator; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; @@ -647,22 +648,53 @@ private List makeCompactSegmentsDuty() return ImmutableList.of(compactSegments); } - private class DutiesRunnable implements Runnable + protected class DutiesRunnable implements Runnable { private final long startTimeNanos = System.nanoTime(); private final List duties; private final int startingLeaderCounter; + private int cachedBalancerThreadNumber; + private ListeningExecutorService balancerExec; - private DutiesRunnable(List duties, final int startingLeaderCounter) + protected DutiesRunnable(List duties, final int startingLeaderCounter) { this.duties = duties; this.startingLeaderCounter = startingLeaderCounter; } + @VisibleForTesting + protected void initBalancerExecutor() + { + final int currentNumber = getDynamicConfigs().getBalancerComputeThreads(); + final String threadNameFormat = "coordinator-cost-balancer-%s"; + // fist time initialization + if (balancerExec == null) { + balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded( + currentNumber, + threadNameFormat + )); + cachedBalancerThreadNumber = currentNumber; + return; + } + + if (cachedBalancerThreadNumber != currentNumber) { + log.info( + "balancerComputeThreads has been changed from [%s] to [%s], recreating the thread pool.", + cachedBalancerThreadNumber, + currentNumber + ); + balancerExec.shutdownNow(); + balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded( + currentNumber, + threadNameFormat + )); + cachedBalancerThreadNumber = currentNumber; + } + } + @Override public void run() { - ListeningExecutorService balancerExec = null; try { synchronized (lock) { if (!coordLeaderSelector.isLeader()) { @@ -684,10 +716,7 @@ public void run() } } - balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded( - getDynamicConfigs().getBalancerComputeThreads(), - "coordinator-cost-balancer-%s" - )); + initBalancerExecutor(); BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec); // Do coordinator stuff. @@ -733,11 +762,24 @@ public void run() catch (Exception e) { log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit(); } - finally { - if (balancerExec != null) { - balancerExec.shutdownNow(); - } - } + } + + /** + * This method should be used for only testing. + */ + @VisibleForTesting + public int getCachedBalancerThreadNumber() + { + return cachedBalancerThreadNumber; + } + + /** + * This method should be used for only testing. + */ + @VisibleForTesting + public ListeningExecutorService getBalancerExec() + { + return balancerExec; } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 51308cbd8fa9..6cd1ac4143d3 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.ListeningExecutorService; import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.curator.framework.CuratorFramework; @@ -665,6 +666,43 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith EasyMock.verify(metadataRuleManager); } + @Test + public void testBalancerThreadNumber() throws InterruptedException + { + DruidCoordinator c = EasyMock.createNiceMock(DruidCoordinator.class); + CoordinatorDynamicConfig dynamicConfig = EasyMock.createNiceMock(CoordinatorDynamicConfig.class); + EasyMock.expect(c.getDynamicConfigs()).andReturn(dynamicConfig).anyTimes(); + EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2); + EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once(); + EasyMock.replay(c, dynamicConfig); + + DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0); + // before initialization + Assert.assertEquals(0, duty.getCachedBalancerThreadNumber()); + Assert.assertNull(duty.getBalancerExec()); + + // first initialization + duty.initBalancerExecutor(); + Assert.assertEquals(5, duty.getCachedBalancerThreadNumber()); + ListeningExecutorService firstExec = duty.getBalancerExec(); + Assert.assertNotNull(firstExec); + + // second initialization, expect no changes as cachedBalancerThreadNumber is not changed + duty.initBalancerExecutor(); + Assert.assertEquals(5, duty.getCachedBalancerThreadNumber()); + ListeningExecutorService secondExec = duty.getBalancerExec(); + Assert.assertNotNull(secondExec); + Assert.assertTrue(firstExec == secondExec); + + // third initialization, expect executor recreated as cachedBalancerThreadNumber is changed to 10 + duty.initBalancerExecutor(); + Assert.assertEquals(10, duty.getCachedBalancerThreadNumber()); + ListeningExecutorService thirdExec = duty.getBalancerExec(); + Assert.assertNotNull(thirdExec); + Assert.assertFalse(secondExec == thirdExec); + Assert.assertFalse(firstExec == thirdExec); + } + private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount, PathChildrenCache pathChildrenCache, Map segments, From b92157c1427b373243ba0ed8229decb3d5332804 Mon Sep 17 00:00:00 2001 From: Arvin Zheng Date: Fri, 14 Aug 2020 11:18:29 -0700 Subject: [PATCH 2/5] fix UT error --- .../apache/druid/server/coordinator/DruidCoordinatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 6cd1ac4143d3..d3ed28799494 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -667,7 +667,7 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith } @Test - public void testBalancerThreadNumber() throws InterruptedException + public void testBalancerThreadNumber() { DruidCoordinator c = EasyMock.createNiceMock(DruidCoordinator.class); CoordinatorDynamicConfig dynamicConfig = EasyMock.createNiceMock(CoordinatorDynamicConfig.class); From ebd0666acee5e3cc46186626755278d42df1ce32 Mon Sep 17 00:00:00 2001 From: Arvin Zheng Date: Mon, 17 Aug 2020 23:02:30 -0700 Subject: [PATCH 3/5] shutdown the balancer executor in stopBeingLeader and stop --- .../server/coordinator/DruidCoordinator.java | 52 +++++++++++------- .../coordinator/DruidCoordinatorTest.java | 53 +++++++++++++++---- 2 files changed, 74 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 5329e88fc946..70a2a9c21f47 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -150,6 +150,9 @@ public class DruidCoordinator private volatile boolean started = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; + private int cachedBalancerThreadNumber; + private ListeningExecutorService balancerExec; + @Inject public DruidCoordinator( DruidCoordinatorConfig config, @@ -484,6 +487,24 @@ public void moveSegment( } } + /** + * This method should be used for only testing. + */ + @VisibleForTesting + public int getCachedBalancerThreadNumber() + { + return cachedBalancerThreadNumber; + } + + /** + * This method should be used for only testing. + */ + @VisibleForTesting + public ListeningExecutorService getBalancerExec() + { + return balancerExec; + } + @LifecycleStart public void start() { @@ -525,6 +546,10 @@ public void stop() started = false; exec.shutdownNow(); + + if (balancerExec != null) { + balancerExec.shutdownNow(); + } } } @@ -613,6 +638,11 @@ private void stopBeingLeader() lookupCoordinatorManager.stop(); metadataRuleManager.stop(); segmentsMetadataManager.stopPollingDatabasePeriodically(); + + if (balancerExec != null) { + balancerExec.shutdownNow(); + balancerExec = null; + } } } @@ -653,8 +683,8 @@ protected class DutiesRunnable implements Runnable private final long startTimeNanos = System.nanoTime(); private final List duties; private final int startingLeaderCounter; - private int cachedBalancerThreadNumber; - private ListeningExecutorService balancerExec; + //private int cachedBalancerThreadNumber; + //private ListeningExecutorService balancerExec; protected DutiesRunnable(List duties, final int startingLeaderCounter) { @@ -763,24 +793,6 @@ public void run() log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit(); } } - - /** - * This method should be used for only testing. - */ - @VisibleForTesting - public int getCachedBalancerThreadNumber() - { - return cachedBalancerThreadNumber; - } - - /** - * This method should be used for only testing. - */ - @VisibleForTesting - public ListeningExecutorService getBalancerExec() - { - return balancerExec; - } } /** diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index d3ed28799494..9d1756e27de7 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -669,35 +669,66 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith @Test public void testBalancerThreadNumber() { - DruidCoordinator c = EasyMock.createNiceMock(DruidCoordinator.class); CoordinatorDynamicConfig dynamicConfig = EasyMock.createNiceMock(CoordinatorDynamicConfig.class); - EasyMock.expect(c.getDynamicConfigs()).andReturn(dynamicConfig).anyTimes(); EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2); EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once(); - EasyMock.replay(c, dynamicConfig); + + JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class); + EasyMock.expect( + configManager.watch( + EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY), + EasyMock.anyObject(Class.class), + EasyMock.anyObject() + ) + ).andReturn(new AtomicReference(dynamicConfig)).anyTimes(); + + ScheduledExecutorFactory scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); + EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory); + + DruidCoordinator c = new DruidCoordinator( + null, + null, + configManager, + null, + null, + null, + null, + null, + scheduledExecutorFactory, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0); // before initialization - Assert.assertEquals(0, duty.getCachedBalancerThreadNumber()); - Assert.assertNull(duty.getBalancerExec()); + Assert.assertEquals(0, c.getCachedBalancerThreadNumber()); + Assert.assertNull(c.getBalancerExec()); // first initialization duty.initBalancerExecutor(); - Assert.assertEquals(5, duty.getCachedBalancerThreadNumber()); - ListeningExecutorService firstExec = duty.getBalancerExec(); + System.out.println("c.getCachedBalancerThreadNumber(): " + c.getCachedBalancerThreadNumber()); + Assert.assertEquals(5, c.getCachedBalancerThreadNumber()); + ListeningExecutorService firstExec = c.getBalancerExec(); Assert.assertNotNull(firstExec); // second initialization, expect no changes as cachedBalancerThreadNumber is not changed duty.initBalancerExecutor(); - Assert.assertEquals(5, duty.getCachedBalancerThreadNumber()); - ListeningExecutorService secondExec = duty.getBalancerExec(); + Assert.assertEquals(5, c.getCachedBalancerThreadNumber()); + ListeningExecutorService secondExec = c.getBalancerExec(); Assert.assertNotNull(secondExec); Assert.assertTrue(firstExec == secondExec); // third initialization, expect executor recreated as cachedBalancerThreadNumber is changed to 10 duty.initBalancerExecutor(); - Assert.assertEquals(10, duty.getCachedBalancerThreadNumber()); - ListeningExecutorService thirdExec = duty.getBalancerExec(); + Assert.assertEquals(10, c.getCachedBalancerThreadNumber()); + ListeningExecutorService thirdExec = c.getBalancerExec(); Assert.assertNotNull(thirdExec); Assert.assertFalse(secondExec == thirdExec); Assert.assertFalse(firstExec == thirdExec); From 1e502cb45efd86ff505976ce0b4df73bf2ce25f1 Mon Sep 17 00:00:00 2001 From: Arvin Zheng Date: Wed, 9 Sep 2020 13:51:10 -0700 Subject: [PATCH 4/5] remove commented code --- .../org/apache/druid/server/coordinator/DruidCoordinator.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 70a2a9c21f47..d897fde7c62d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -683,8 +683,6 @@ protected class DutiesRunnable implements Runnable private final long startTimeNanos = System.nanoTime(); private final List duties; private final int startingLeaderCounter; - //private int cachedBalancerThreadNumber; - //private ListeningExecutorService balancerExec; protected DutiesRunnable(List duties, final int startingLeaderCounter) { From 85575950e46ea8ab767d0b63b9ea18ce1e9c4550 Mon Sep 17 00:00:00 2001 From: Arvin Zheng Date: Thu, 10 Sep 2020 21:06:15 -0700 Subject: [PATCH 5/5] remove comments --- .../druid/server/coordinator/DruidCoordinator.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index d897fde7c62d..a40676144517 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -487,18 +487,12 @@ public void moveSegment( } } - /** - * This method should be used for only testing. - */ @VisibleForTesting public int getCachedBalancerThreadNumber() { return cachedBalancerThreadNumber; } - - /** - * This method should be used for only testing. - */ + @VisibleForTesting public ListeningExecutorService getBalancerExec() { @@ -678,6 +672,7 @@ private List makeCompactSegmentsDuty() return ImmutableList.of(compactSegments); } + @VisibleForTesting protected class DutiesRunnable implements Runnable { private final long startTimeNanos = System.nanoTime();