Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package io.druid.server.coordinator.helper;

import com.google.common.collect.Lists;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.server.coordinator.BalancerSegmentHolder;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
Expand Down Expand Up @@ -103,31 +103,37 @@ private void balanceTier(
return;
}

final List<ServerHolder> serverHolderList = Lists.newArrayList(servers);
final List<ServerHolder> toMoveFrom = Lists.newArrayList(servers);
final List<ServerHolder> toMoveTo = Lists.newArrayList(servers);

if (serverHolderList.size() <= 1) {
if (toMoveTo.size() <= 1) {
log.info("[%s]: One or fewer servers found. Cannot balance.", tier);
return;
}

int numSegments = 0;
for (ServerHolder server : serverHolderList) {
numSegments += server.getServer().getSegments().size();
for (ServerHolder sourceHolder : toMoveFrom) {
numSegments += sourceHolder.getServer().getSegments().size();
}

if (numSegments == 0) {
log.info("No segments found. Cannot balance.");
return;
}

final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
long unmoved = 0L;
for (int iter = 0; iter < maxSegmentsToMove; iter++) {
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
if (maxToLoad > 0) {
toMoveTo.removeIf(s -> s.getNumberOfSegmentsInQueue() >= maxToLoad);
}
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom);

if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
final ServerHolder destinationHolder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveTo);

if (holder != null) {
moveSegment(segmentToMove, holder.getServer(), params);
if (destinationHolder != null) {
moveSegment(segmentToMove, destinationHolder.getServer(), params);
} else {
++unmoved;
}
Expand All @@ -140,7 +146,7 @@ private void balanceTier(
stats.addToTieredStat("unmovedCount", tier, unmoved);
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
if (params.getCoordinatorDynamicConfig().emitBalancingStats()) {
strategy.emitStats(tier, stats, serverHolderList);
strategy.emitStats(tier, stats, toMoveFrom);
}
log.info(
"[%s]: Segments Moved: [%d] Segments Let Alone: [%d]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public void tearDown()
balancerStrategyExecutor.shutdownNow();
}


@Test
public void testMoveToEmptyServerBalancer()
{
Expand All @@ -185,7 +184,7 @@ public void testMoveToEmptyServerBalancer()
)
);

DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
Expand All @@ -196,6 +195,48 @@ public void testMoveToEmptyServerBalancer()
Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}

@Test
public void testMoveMaxLoadQueueServerBalancer()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap());

EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);

// Mock stuff that the coordinator needs
mockCoordinator(coordinator);

BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy(
balancerStrategy,
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1),
new BalancerSegmentHolder(druidServer1, segment2),
new BalancerSegmentHolder(druidServer1, segment3),
new BalancerSegmentHolder(druidServer1, segment4)
)
);

DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
.withDynamicConfigs(
CoordinatorDynamicConfig
.builder()
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
.withMaxSegmentsInNodeLoadingQueue(1)
.build()
)
.build();

params = new DruidCoordinatorBalancerTester(coordinator).run(params);

// max to move is 5, all segments on server 1, but only expect to move 1 to server 2 since max node load queue is 1
Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}

@Test
public void testMoveSameSegmentTwice()
{
Expand All @@ -215,7 +256,7 @@ public void testMoveSameSegmentTwice()
)
);

DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
Expand Down Expand Up @@ -244,7 +285,7 @@ public void testRun1()
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);

DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
).build();
Expand All @@ -253,7 +294,6 @@ public void testRun1()
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
}


@Test
public void testRun2()
{
Expand All @@ -266,13 +306,13 @@ public void testRun2()
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);

DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(druidServers, peons).build();
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(druidServers, peons).build();

params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
}

private DruidCoordinatorRuntimeParams.Builder defaullRuntimeParamsBuilder(
private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
List<ImmutableDruidServer> druidServers,
List<LoadQueuePeon> peons
)
Expand Down Expand Up @@ -392,5 +432,4 @@ public void emitStats(
delegate.emitStats(tier, stats, serverHolderList);
}
}

}