Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package io.druid.server.coordinator.helper;

import com.google.common.collect.Lists;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.server.coordinator.BalancerSegmentHolder;
import io.druid.server.coordinator.BalancerStrategy;
import io.druid.server.coordinator.CoordinatorStats;
Expand Down Expand Up @@ -103,31 +103,37 @@ private void balanceTier(
return;
}

final List<ServerHolder> serverHolderList = Lists.newArrayList(servers);
final List<ServerHolder> toMoveFrom = Lists.newArrayList(servers);
final List<ServerHolder> toMoveTo = Lists.newArrayList(servers);

if (serverHolderList.size() <= 1) {
if (toMoveTo.size() <= 1) {
log.info("[%s]: One or fewer servers found. Cannot balance.", tier);
return;
}

int numSegments = 0;
for (ServerHolder server : serverHolderList) {
numSegments += server.getServer().getSegments().size();
for (ServerHolder sourceHolder : toMoveFrom) {
numSegments += sourceHolder.getServer().getSegments().size();
}

if (numSegments == 0) {
log.info("No segments found. Cannot balance.");
return;
}

final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
long unmoved = 0L;
for (int iter = 0; iter < maxSegmentsToMove; iter++) {
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
if (maxToLoad > 0) {
toMoveTo.removeIf(s -> s.getNumberOfSegmentsInQueue() >= maxToLoad);
}
final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom);

if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
final ServerHolder destinationHolder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveTo);

if (holder != null) {
moveSegment(segmentToMove, holder.getServer(), params);
if (destinationHolder != null) {
moveSegment(segmentToMove, destinationHolder.getServer(), params);
} else {
++unmoved;
}
Expand All @@ -140,7 +146,7 @@ private void balanceTier(
stats.addToTieredStat("unmovedCount", tier, unmoved);
stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
if (params.getCoordinatorDynamicConfig().emitBalancingStats()) {
strategy.emitStats(tier, stats, serverHolderList);
strategy.emitStats(tier, stats, toMoveFrom);
}
log.info(
"[%s]: Segments Moved: [%d] Segments Let Alone: [%d]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ public void tearDown() throws Exception
balancerStrategyExecutor.shutdownNow();
}


@Test
public void testMoveToEmptyServerBalancer() throws IOException
{
Expand All @@ -186,7 +185,7 @@ public void testMoveToEmptyServerBalancer() throws IOException
)
);

DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
Expand All @@ -197,6 +196,48 @@ public void testMoveToEmptyServerBalancer() throws IOException
Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}

@Test
public void testMoveMaxLoadQueueServerBalancer()
{
mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap());

EasyMock.replay(druidServer3);
EasyMock.replay(druidServer4);

// Mock stuff that the coordinator needs
mockCoordinator(coordinator);

BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy(
balancerStrategy,
ImmutableList.of(
new BalancerSegmentHolder(druidServer1, segment1),
new BalancerSegmentHolder(druidServer1, segment2),
new BalancerSegmentHolder(druidServer1, segment3),
new BalancerSegmentHolder(druidServer1, segment4)
)
);

DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
.withBalancerStrategy(predefinedPickOrderStrategy)
.withDynamicConfigs(
CoordinatorDynamicConfig
.builder()
.withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
.withMaxSegmentsInNodeLoadingQueue(1)
.build()
)
.build();

params = new DruidCoordinatorBalancerTester(coordinator).run(params);

// max to move is 5, all segments on server 1, but only expect to move 1 to server 2 since max node load queue is 1
Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
}

@Test
public void testMoveSameSegmentTwice() throws Exception
{
Expand All @@ -216,7 +257,7 @@ public void testMoveSameSegmentTwice() throws Exception
)
);

DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
)
Expand Down Expand Up @@ -245,7 +286,7 @@ public void testRun1() throws IOException
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);

DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
ImmutableList.of(druidServer1, druidServer2),
ImmutableList.of(peon1, peon2)
).build();
Expand All @@ -254,7 +295,6 @@ public void testRun1() throws IOException
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
}


@Test
public void testRun2() throws IOException
{
Expand All @@ -267,13 +307,13 @@ public void testRun2() throws IOException
// Mock stuff that the coordinator needs
mockCoordinator(coordinator);

DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(druidServers, peons).build();
DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(druidServers, peons).build();

params = new DruidCoordinatorBalancerTester(coordinator).run(params);
Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
}

private DruidCoordinatorRuntimeParams.Builder defaullRuntimeParamsBuilder(
private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
List<ImmutableDruidServer> druidServers,
List<LoadQueuePeon> peons
)
Expand Down Expand Up @@ -393,5 +433,4 @@ public void emitStats(
delegate.emitStats(tier, stats, serverHolderList);
}
}

}