Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.server.coordinator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -149,6 +150,9 @@ public class DruidCoordinator
private volatile boolean started = false;
private volatile SegmentReplicantLookup segmentReplicantLookup = null;

private int cachedBalancerThreadNumber;
private ListeningExecutorService balancerExec;

@Inject
public DruidCoordinator(
DruidCoordinatorConfig config,
Expand Down Expand Up @@ -483,6 +487,18 @@ public void moveSegment(
}
}

@VisibleForTesting
public int getCachedBalancerThreadNumber()
{
return cachedBalancerThreadNumber;
}

@VisibleForTesting
public ListeningExecutorService getBalancerExec()
{
return balancerExec;
}

@LifecycleStart
public void start()
{
Expand Down Expand Up @@ -524,6 +540,10 @@ public void stop()
started = false;

exec.shutdownNow();

if (balancerExec != null) {
balancerExec.shutdownNow();
}
}
}

Expand Down Expand Up @@ -612,6 +632,11 @@ private void stopBeingLeader()
lookupCoordinatorManager.stop();
metadataRuleManager.stop();
segmentsMetadataManager.stopPollingDatabasePeriodically();

if (balancerExec != null) {
balancerExec.shutdownNow();
balancerExec = null;
}
}
}

Expand Down Expand Up @@ -647,22 +672,52 @@ private List<CoordinatorDuty> makeCompactSegmentsDuty()
return ImmutableList.of(compactSegments);
}

private class DutiesRunnable implements Runnable
@VisibleForTesting
protected class DutiesRunnable implements Runnable
Comment thread
ArvinZheng marked this conversation as resolved.
{
private final long startTimeNanos = System.nanoTime();
private final List<CoordinatorDuty> duties;
private final int startingLeaderCounter;

private DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter)
protected DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter)
{
this.duties = duties;
this.startingLeaderCounter = startingLeaderCounter;
}

@VisibleForTesting
protected void initBalancerExecutor()
{
final int currentNumber = getDynamicConfigs().getBalancerComputeThreads();
final String threadNameFormat = "coordinator-cost-balancer-%s";
// fist time initialization
if (balancerExec == null) {
balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
currentNumber,
threadNameFormat
));
cachedBalancerThreadNumber = currentNumber;
return;
}

if (cachedBalancerThreadNumber != currentNumber) {
log.info(
"balancerComputeThreads has been changed from [%s] to [%s], recreating the thread pool.",
cachedBalancerThreadNumber,
currentNumber
);
balancerExec.shutdownNow();
balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
currentNumber,
threadNameFormat
));
cachedBalancerThreadNumber = currentNumber;
}
}

@Override
public void run()
{
ListeningExecutorService balancerExec = null;
try {
synchronized (lock) {
if (!coordLeaderSelector.isLeader()) {
Expand All @@ -684,10 +739,7 @@ public void run()
}
}

balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(
getDynamicConfigs().getBalancerComputeThreads(),
"coordinator-cost-balancer-%s"
));
initBalancerExecutor();
BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec);

// Do coordinator stuff.
Expand Down Expand Up @@ -733,11 +785,6 @@ public void run()
catch (Exception e) {
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit();
}
finally {
if (balancerExec != null) {
balancerExec.shutdownNow();
Comment thread
ArvinZheng marked this conversation as resolved.
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListeningExecutorService;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -665,6 +666,74 @@ public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWith
EasyMock.verify(metadataRuleManager);
}

@Test
public void testBalancerThreadNumber()
{
CoordinatorDynamicConfig dynamicConfig = EasyMock.createNiceMock(CoordinatorDynamicConfig.class);
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2);
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once();

JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
EasyMock.expect(
configManager.watch(
EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
EasyMock.anyObject(Class.class),
EasyMock.anyObject()
)
).andReturn(new AtomicReference(dynamicConfig)).anyTimes();

ScheduledExecutorFactory scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory);

DruidCoordinator c = new DruidCoordinator(
null,
null,
configManager,
null,
null,
null,
null,
null,
scheduledExecutorFactory,
null,
null,
null,
null,
null,
null,
null,
null,
null
);

DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0);
// before initialization
Assert.assertEquals(0, c.getCachedBalancerThreadNumber());
Assert.assertNull(c.getBalancerExec());

// first initialization
duty.initBalancerExecutor();
System.out.println("c.getCachedBalancerThreadNumber(): " + c.getCachedBalancerThreadNumber());
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService firstExec = c.getBalancerExec();
Assert.assertNotNull(firstExec);

// second initialization, expect no changes as cachedBalancerThreadNumber is not changed
duty.initBalancerExecutor();
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService secondExec = c.getBalancerExec();
Assert.assertNotNull(secondExec);
Assert.assertTrue(firstExec == secondExec);

// third initialization, expect executor recreated as cachedBalancerThreadNumber is changed to 10
duty.initBalancerExecutor();
Assert.assertEquals(10, c.getCachedBalancerThreadNumber());
ListeningExecutorService thirdExec = c.getBalancerExec();
Assert.assertNotNull(thirdExec);
Assert.assertFalse(secondExec == thirdExec);
Assert.assertFalse(firstExec == thirdExec);
}

private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
PathChildrenCache pathChildrenCache,
Map<String, DataSegment> segments,
Expand Down