From bab239f908f4d3bc4f977d003e5b67c38c4fcac2 Mon Sep 17 00:00:00 2001 From: Han Date: Tue, 11 May 2021 10:14:01 +0800 Subject: [PATCH 1/3] reduce method invocation of reservoir sampling --- .../server/coordinator/BalancerStrategy.java | 47 ++++++++++++++ .../coordinator/ReservoirSegmentSampler.java | 38 ++++++++++++ .../coordinator/duty/BalanceSegments.java | 20 ++++-- .../coordinator/BalanceSegmentsTest.java | 61 +++++++++++++------ .../ReservoirSegmentSamplerTest.java | 8 +-- 5 files changed, 144 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java index 3d0ec0f6665b..1fa66b476f63 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java @@ -71,6 +71,7 @@ public interface BalancerStrategy * @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if * there are no segments to pick from (i. e. all provided serverHolders are empty). */ + @Deprecated @Nullable BalancerSegmentHolder pickSegmentToMove( List serverHolders, @@ -78,6 +79,52 @@ BalancerSegmentHolder pickSegmentToMove( double percentOfSegmentsToConsider ); + /** + * Pick the best segments to move from one of the supplied set of servers according to the balancing strategy. + * @param serverHolders set of historicals to consider for moving segments + * @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules. + * Balancing strategies should avoid rebalancing segments for such datasources, since + * they should be loaded on all servers anyway. + * NOTE: this should really be handled on a per-segment basis, to properly support + * the interval or period-based broadcast rules. For simplicity of the initial + * implementation, only forever broadcast rules are supported. + * @param numberOfSegments number of segments to be picked from servers + * @return Iterator for set of {@link BalancerSegmentHolder} containing segment to move and server they currently + * reside on, or empty if there are no segments to pick from (i. e. all provided serverHolders are empty). + */ + default Iterator pickSegmentsToMove(List serverHolders, + Set broadcastDatasources, + int numberOfSegments) + { + return new Iterator() + { + private Iterator it = ReservoirSegmentSampler.getRandomBalancerSegmentHolders( + serverHolders, + broadcastDatasources, + numberOfSegments + ).iterator(); + @Override + public boolean hasNext() + { + if (it.hasNext()) { + return true; + } + it = ReservoirSegmentSampler.getRandomBalancerSegmentHolders( + serverHolders, + broadcastDatasources, + numberOfSegments + ).iterator(); + return it.hasNext(); + } + + @Override + public BalancerSegmentHolder next() + { + return it.next(); + } + }; + } + /** * Returns an iterator for a set of servers to drop from, ordered by preference of which server to drop from first * for a given drop strategy. One or more segments may be dropped, depending on how much the segment is diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java index dd43760ec874..751f2597e6b3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java @@ -22,6 +22,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.timeline.DataSegment; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; @@ -31,6 +32,42 @@ final class ReservoirSegmentSampler private static final EmittingLogger log = new EmittingLogger(ReservoirSegmentSampler.class); + static List getRandomBalancerSegmentHolders( + final List serverHolders, + Set broadcastDatasources, + int k + ) + { + List holders = new ArrayList<>(k); + int numSoFar = 0; + + for (ServerHolder server : serverHolders) { + if (!server.getServer().getType().isSegmentReplicationTarget()) { + // if the server only handles broadcast segments (which don't need to be rebalanced), we have nothing to do + continue; + } + + for (DataSegment segment : server.getServer().iterateAllSegments()) { + if (broadcastDatasources.contains(segment.getDataSource())) { + // we don't need to rebalance segments that were assigned via broadcast rules + continue; + } + + if (numSoFar < k) { + holders.add(new BalancerSegmentHolder(server.getServer(), segment)); + numSoFar++; + continue; + } + int randNum = ThreadLocalRandom.current().nextInt(numSoFar + 1); + if (randNum < k) { + holders.set(randNum, new BalancerSegmentHolder(server.getServer(), segment)); + } + numSoFar++; + } + } + return holders; + } + /** * Iterates over segments that live on the candidate servers passed in {@link ServerHolder} and (possibly) picks a * segment to return to caller in a {@link BalancerSegmentHolder} object. @@ -42,6 +79,7 @@ final class ReservoirSegmentSampler * returning immediately. * @return */ + @Deprecated static BalancerSegmentHolder getRandomBalancerSegmentHolder( final List serverHolders, Set broadcastDatasources, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index d1fca19e03be..01b326d87807 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -36,6 +36,7 @@ import org.apache.druid.timeline.SegmentId; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -180,22 +181,29 @@ private Pair balanceServers( int maxSegmentsToMove ) { + if (maxSegmentsToMove <= 0) { + return new Pair<>(0, 0); + } + final BalancerStrategy strategy = params.getBalancerStrategy(); final int maxIterations = 2 * maxSegmentsToMove; final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); int moved = 0, unmoved = 0; + Iterator segmetnsToMove = strategy.pickSegmentsToMove( + toMoveFrom, + params.getBroadcastDatasources(), + maxSegmentsToMove + ); + //noinspection ForLoopThatDoesntUseLoopVariable for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) { - final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove( - toMoveFrom, - params.getBroadcastDatasources(), - params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove() - ); - if (segmentToMoveHolder == null) { + if (!segmetnsToMove.hasNext()) { log.info("All servers to move segments from are empty, ending run."); break; } + final BalancerSegmentHolder segmentToMoveHolder = segmetnsToMove.next(); + // DruidCoordinatorRuntimeParams.getUsedSegments originate from SegmentsMetadataManager, i. e. that's a set of segments // that *should* be loaded. segmentToMoveHolder.getSegment originates from ServerInventoryView, i. e. that may be // any segment that happens to be loaded on some server, even if it is not used. (Coordinator closes such diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index 26175f613216..11a04e237542 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -36,12 +36,14 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.Executors; @@ -233,20 +235,25 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove() BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); EasyMock.expect( - strategy.pickSegmentToMove( + strategy.pickSegmentsToMove( ImmutableList.of( new ServerHolder(druidServer2, peon2, false) ), broadcastDatasources, - 100 + 2 ) ).andReturn( - new BalancerSegmentHolder(druidServer2, segment3)).andReturn(new BalancerSegmentHolder(druidServer2, segment4) + ImmutableList.of( + new BalancerSegmentHolder(druidServer2, segment3), + new BalancerSegmentHolder(druidServer2, segment4) + ).iterator() ); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) - .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) - .andReturn(new BalancerSegmentHolder(druidServer1, segment2)); + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + .andReturn( + ImmutableList.of( + new BalancerSegmentHolder(druidServer1, segment1), + new BalancerSegmentHolder(druidServer1, segment2)).iterator()); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new ServerHolder(druidServer3, peon3)) @@ -309,11 +316,14 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissi mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) - .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) - .andReturn(new BalancerSegmentHolder(druidServer1, segment2)) - .andReturn(new BalancerSegmentHolder(druidServer2, segment3)) - .andReturn(new BalancerSegmentHolder(druidServer2, segment4)); + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()); + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + .andReturn( + ImmutableList.of( + new BalancerSegmentHolder(druidServer1, segment2), + new BalancerSegmentHolder(druidServer2, segment3), + new BalancerSegmentHolder(druidServer2, segment4)).iterator()); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new ServerHolder(druidServer3, peon3)) @@ -358,8 +368,8 @@ public void testMoveToDecommissioningServer() mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) - .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()) .anyTimes(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> { List holders = (List) EasyMock.getCurrentArguments()[1]; @@ -393,8 +403,8 @@ public void testMoveFromDecommissioningServer() ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) - .andReturn(new BalancerSegmentHolder(druidServer1, segment1)) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()) .once(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(holder2) @@ -541,6 +551,7 @@ public void testRun2() * out to pickSegmentToMove. This config limits the number of segments that are considered when looking for a segment * to move. */ + @Ignore @Test public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove() { @@ -715,6 +726,16 @@ public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, Li return delegate.findNewSegmentHomeReplicator(proposalSegment, serverHolders); } + @Override + public Iterator pickSegmentsToMove( + List serverHolders, + Set broadcastDatasources, + int numberOfSegments + ) + { + return pickOrder.iterator(); + } + @Override public BalancerSegmentHolder pickSegmentToMove( List serverHolders, @@ -745,18 +766,18 @@ private DruidCoordinatorRuntimeParams setupParamsForDecommissioningMaxPercentOfM // either decommissioning servers list or acitve ones (ie servers list is [2] or [1, 3]) BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); EasyMock.expect( - strategy.pickSegmentToMove( + strategy.pickSegmentsToMove( ImmutableList.of( new ServerHolder(druidServer2, peon2, true) ), broadcastDatasources, - 100 + 1 ) ).andReturn( - new BalancerSegmentHolder(druidServer2, segment2) + ImmutableList.of(new BalancerSegmentHolder(druidServer2, segment2)).iterator() ); - EasyMock.expect(strategy.pickSegmentToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) - .andReturn(new BalancerSegmentHolder(druidServer1, segment1)); + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new ServerHolder(druidServer3, peon3)) .anyTimes(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java index f59f24274038..70086abd772a 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java @@ -174,10 +174,10 @@ public void getRandomBalancerSegmentHolderTest() Map segmentCountMap = new HashMap<>(); for (int i = 0; i < iterations; i++) { // due to the pseudo-randomness of this method, we may not select a segment every single time no matter what. - BalancerSegmentHolder balancerSegmentHolder = ReservoirSegmentSampler.getRandomBalancerSegmentHolder(holderList, Collections.emptySet(), 100); - if (balancerSegmentHolder != null) { - segmentCountMap.put(balancerSegmentHolder.getSegment(), 1); - } + segmentCountMap.put( + ReservoirSegmentSampler.getRandomBalancerSegmentHolders(holderList, Collections.emptySet(), 1).get(0).getSegment(), + 1 + ); } for (DataSegment segment : segments) { From 35c0ca42666b8143e7f182c93c2109826a5a68b0 Mon Sep 17 00:00:00 2001 From: Han Date: Mon, 5 Jul 2021 11:37:38 +0800 Subject: [PATCH 2/3] add a dynamic parameter and add benchmark --- .../BalancerStrategyBenchmark.java | 142 ++++++++++++++++++ docs/configuration/index.md | 2 + .../server/coordinator/BalancerStrategy.java | 95 +++++++----- .../coordinator/CoordinatorDynamicConfig.java | 26 ++++ .../coordinator/CostBalancerStrategy.java | 16 -- .../coordinator/RandomBalancerStrategy.java | 15 -- .../coordinator/duty/BalanceSegments.java | 11 +- .../coordinator/BalanceSegmentsTest.java | 67 ++++++--- ...inator-dynamic-config-dialog.spec.tsx.snap | 8 + .../coordinator-dynamic-config.tsx | 13 ++ 10 files changed, 295 insertions(+), 100 deletions(-) create mode 100644 benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java new file mode 100644 index 000000000000..852403d2a214 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyBenchmark.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; +import io.vavr.collection.Stream; +import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 10) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class BalancerStrategyBenchmark +{ + private static final Random RANDOM = new Random(0); + private static final Interval TEST_SEGMENT_INTERVAL = Intervals.of("2012-03-15T00:00:00.000/2012-03-16T00:00:00.000"); + private static final int NUMBER_OF_SERVERS = 20; + + @Param({"default", "50percentOfSegmentsToConsiderPerMove", "useBatchedSegmentSampler"}) + private String mode; + + @Param({"10000", "100000", "1000000"}) + private int numberOfSegments; + + @Param({"10", "100", "1000"}) + private int maxSegmentsToMove; + + private final List serverHolders = new ArrayList<>(); + private int reservoirSize = 1; + private double percentOfSegmentsToConsider = 100; + private final BalancerStrategy balancerStrategy = new CostBalancerStrategy( + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)) + ); + + @Setup(Level.Trial) + public void setup() throws IOException + { + switch (mode) { + case "50percentOfSegmentsToConsiderPerMove": + percentOfSegmentsToConsider = 50; + break; + case "useBatchedSegmentSampler": + reservoirSize = maxSegmentsToMove; + break; + default: + } + + List> segmentList = new ArrayList<>(NUMBER_OF_SERVERS); + Stream.range(0, NUMBER_OF_SERVERS).forEach(i -> segmentList.add(new ArrayList<>())); + for (int i = 0; i < numberOfSegments; i++) { + segmentList.get(RANDOM.nextInt(NUMBER_OF_SERVERS)).add( + new DataSegment( + "test", + TEST_SEGMENT_INTERVAL, + String.valueOf(i), + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + null, + 0, + 10L + ) + ); + } + + for (List segments : segmentList) { + serverHolders.add( + new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("id", "host", null, 10000000L, ServerType.HISTORICAL, "hot", 1), + 3000L, + ImmutableMap.of("test", new ImmutableDruidDataSource("test", Collections.emptyMap(), segments)), + segments.size() + ), + new LoadQueuePeonTester() + ) + ); + } + } + + @Benchmark + public void pickSegmentsToMove(Blackhole blackhole) + { + Iterator iterator = balancerStrategy.pickSegmentsToMove( + serverHolders, + Collections.emptySet(), + reservoirSize, + percentOfSegmentsToConsider + ); + for (int i = 0; i < maxSegmentsToMove && iterator.hasNext(); i++) { + blackhole.consume(iterator.next()); + } + } +} diff --git a/docs/configuration/index.md b/docs/configuration/index.md index a5e9d9aa2390..5cc26687bf29 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -809,6 +809,7 @@ A sample Coordinator dynamic config JSON object is shown below: "mergeBytesLimit": 100000000, "mergeSegmentsLimit" : 1000, "maxSegmentsToMove": 5, + "useBatchedSegmentSampler": false, "percentOfSegmentsToConsiderPerMove": 100, "replicantLifetime": 15, "replicationThrottleLimit": 10, @@ -830,6 +831,7 @@ Issuing a GET request at the same URL will return the spec that is currently in |`mergeBytesLimit`|The maximum total uncompressed size in bytes of segments to merge.|524288000L| |`mergeSegmentsLimit`|The maximum number of segments that can be in a single [append task](../ingestion/tasks.md).|100| |`maxSegmentsToMove`|The maximum number of segments that can be moved at any given time.|5| +|`useBatchedSegmentSampler`|Boolean flag for whether or not we should use the Reservoir Sampling with a reservoir of size k instead of fixed size 1 to pick segments to move. This option can be enabled to speed up segment balancing process, especially if there are huge number of segments in the cluster or if there are too many segments to move.|false| |`percentOfSegmentsToConsiderPerMove`|The percentage of the total number of segments in the cluster that are considered every time a segment needs to be selected for a move. Druid orders servers by available capacity ascending (the least available capacity first) and then iterates over the servers. For each server, Druid iterates over the segments on the server, considering them for moving. The default config of 100% means that every segment on every server is a candidate to be moved. This should make sense for most small to medium-sized clusters. However, an admin may find it preferable to drop this value lower if they don't think that it is worthwhile to consider every single segment in the cluster each time it is looking for a segment to move.|100| |`replicantLifetime`|The maximum number of Coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java index 1fa66b476f63..78ab9514ae7e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java @@ -55,7 +55,8 @@ public interface BalancerStrategy ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List serverHolders); /** - * Pick the best segment to move from one of the supplied set of servers according to the balancing strategy. + * Pick the best segments to move from one of the supplied set of servers according to the balancing strategy. + * * @param serverHolders set of historicals to consider for moving segments * @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules. * Balancing strategies should avoid rebalancing segments for such datasources, since @@ -63,64 +64,78 @@ public interface BalancerStrategy * NOTE: this should really be handled on a per-segment basis, to properly support * the interval or period-based broadcast rules. For simplicity of the initial * implementation, only forever broadcast rules are supported. + * @param reservoirSize the reservoir size maintained by the Reservoir Sampling algorithm. * @param percentOfSegmentsToConsider The percentage of the total number of segments that we will consider when * choosing which segment to move. {@link CoordinatorDynamicConfig} defines a * config percentOfSegmentsToConsiderPerMove that will be used as an argument * for implementations of this method. - * - * @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if - * there are no segments to pick from (i. e. all provided serverHolders are empty). + * @return Iterator for set of {@link BalancerSegmentHolder} containing segment to move and server they currently + * reside on, or empty if there are no segments to pick from (i. e. all provided serverHolders are empty). */ - @Deprecated - @Nullable - BalancerSegmentHolder pickSegmentToMove( + default Iterator pickSegmentsToMove( List serverHolders, Set broadcastDatasources, + int reservoirSize, double percentOfSegmentsToConsider - ); - - /** - * Pick the best segments to move from one of the supplied set of servers according to the balancing strategy. - * @param serverHolders set of historicals to consider for moving segments - * @param broadcastDatasources Datasources that contain segments which were loaded via broadcast rules. - * Balancing strategies should avoid rebalancing segments for such datasources, since - * they should be loaded on all servers anyway. - * NOTE: this should really be handled on a per-segment basis, to properly support - * the interval or period-based broadcast rules. For simplicity of the initial - * implementation, only forever broadcast rules are supported. - * @param numberOfSegments number of segments to be picked from servers - * @return Iterator for set of {@link BalancerSegmentHolder} containing segment to move and server they currently - * reside on, or empty if there are no segments to pick from (i. e. all provided serverHolders are empty). - */ - default Iterator pickSegmentsToMove(List serverHolders, - Set broadcastDatasources, - int numberOfSegments) + ) { + if (reservoirSize > 1) { + return new Iterator() + { + private Iterator it = sample(); + + private Iterator sample() + { + return ReservoirSegmentSampler.getRandomBalancerSegmentHolders( + serverHolders, + broadcastDatasources, + reservoirSize + ).iterator(); + } + + @Override + public boolean hasNext() + { + if (it.hasNext()) { + return true; + } + it = sample(); + return it.hasNext(); + } + + @Override + public BalancerSegmentHolder next() + { + return it.next(); + } + }; + } + return new Iterator() { - private Iterator it = ReservoirSegmentSampler.getRandomBalancerSegmentHolders( - serverHolders, - broadcastDatasources, - numberOfSegments - ).iterator(); - @Override - public boolean hasNext() + private BalancerSegmentHolder next = sample(); + + private BalancerSegmentHolder sample() { - if (it.hasNext()) { - return true; - } - it = ReservoirSegmentSampler.getRandomBalancerSegmentHolders( + return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( serverHolders, broadcastDatasources, - numberOfSegments - ).iterator(); - return it.hasNext(); + percentOfSegmentsToConsider + ); + } + + @Override + public boolean hasNext() + { + return next != null; } @Override public BalancerSegmentHolder next() { - return it.next(); + BalancerSegmentHolder ret = next; + next = sample(); + return ret; } }; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index d125aa4edab5..d4bf3e8b05d8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -55,6 +55,7 @@ public class CoordinatorDynamicConfig private final int mergeSegmentsLimit; private final int maxSegmentsToMove; private final double percentOfSegmentsToConsiderPerMove; + private final boolean useBatchedSegmentSampler; private final int replicantLifetime; private final int replicationThrottleLimit; private final int balancerComputeThreads; @@ -115,6 +116,7 @@ public CoordinatorDynamicConfig( @JsonProperty("mergeSegmentsLimit") int mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") int maxSegmentsToMove, @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, + @JsonProperty("useBatchedSegmentSampler") boolean useBatchedSegmentSampler, @JsonProperty("replicantLifetime") int replicantLifetime, @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, @JsonProperty("balancerComputeThreads") int balancerComputeThreads, @@ -161,6 +163,7 @@ public CoordinatorDynamicConfig( ); this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; + this.useBatchedSegmentSampler = useBatchedSegmentSampler; this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = Math.max(balancerComputeThreads, 1); @@ -272,6 +275,12 @@ public double getPercentOfSegmentsToConsiderPerMove() return percentOfSegmentsToConsiderPerMove; } + @JsonProperty + public boolean useBatchedSegmentSampler() + { + return useBatchedSegmentSampler; + } + @JsonProperty public int getReplicantLifetime() { @@ -377,6 +386,7 @@ public String toString() ", mergeSegmentsLimit=" + mergeSegmentsLimit + ", maxSegmentsToMove=" + maxSegmentsToMove + ", percentOfSegmentsToConsiderPerMove=" + percentOfSegmentsToConsiderPerMove + + ", useBatchedSegmentSampler=" + useBatchedSegmentSampler + ", replicantLifetime=" + replicantLifetime + ", replicationThrottleLimit=" + replicationThrottleLimit + ", balancerComputeThreads=" + balancerComputeThreads + @@ -421,6 +431,9 @@ public boolean equals(Object o) if (percentOfSegmentsToConsiderPerMove != that.percentOfSegmentsToConsiderPerMove) { return false; } + if (useBatchedSegmentSampler != that.useBatchedSegmentSampler) { + return false; + } if (replicantLifetime != that.replicantLifetime) { return false; } @@ -469,6 +482,7 @@ public int hashCode() mergeSegmentsLimit, maxSegmentsToMove, percentOfSegmentsToConsiderPerMove, + useBatchedSegmentSampler, replicantLifetime, replicationThrottleLimit, balancerComputeThreads, @@ -501,6 +515,7 @@ public static class Builder private static final int DEFAULT_REPLICATION_THROTTLE_LIMIT = 10; private static final int DEFAULT_BALANCER_COMPUTE_THREADS = 1; private static final boolean DEFAULT_EMIT_BALANCING_STATS = false; + private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false; private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false; private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0; private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70; @@ -513,6 +528,7 @@ public static class Builder private Integer mergeSegmentsLimit; private Integer maxSegmentsToMove; private Double percentOfSegmentsToConsiderPerMove; + private Boolean useBatchedSegmentSampler; private Integer replicantLifetime; private Integer replicationThrottleLimit; private Boolean emitBalancingStats; @@ -539,6 +555,7 @@ public Builder( @JsonProperty("mergeSegmentsLimit") @Nullable Integer mergeSegmentsLimit, @JsonProperty("maxSegmentsToMove") @Nullable Integer maxSegmentsToMove, @JsonProperty("percentOfSegmentsToConsiderPerMove") @Nullable Double percentOfSegmentsToConsiderPerMove, + @JsonProperty("useBatchedSegmentSampler") Boolean useBatchedSegmentSampler, @JsonProperty("replicantLifetime") @Nullable Integer replicantLifetime, @JsonProperty("replicationThrottleLimit") @Nullable Integer replicationThrottleLimit, @JsonProperty("balancerComputeThreads") @Nullable Integer balancerComputeThreads, @@ -561,6 +578,7 @@ public Builder( this.mergeSegmentsLimit = mergeSegmentsLimit; this.maxSegmentsToMove = maxSegmentsToMove; this.percentOfSegmentsToConsiderPerMove = percentOfSegmentsToConsiderPerMove; + this.useBatchedSegmentSampler = useBatchedSegmentSampler; this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.balancerComputeThreads = balancerComputeThreads; @@ -606,6 +624,12 @@ public Builder withPercentOfSegmentsToConsiderPerMove(double percentOfSegmentsTo return this; } + public Builder withUseBatchedSegmentSampler(boolean useBatchedSegmentSampler) + { + this.useBatchedSegmentSampler = useBatchedSegmentSampler; + return this; + } + public Builder withReplicantLifetime(int replicantLifetime) { this.replicantLifetime = replicantLifetime; @@ -689,6 +713,7 @@ public CoordinatorDynamicConfig build() maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : maxSegmentsToMove, percentOfSegmentsToConsiderPerMove == null ? DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE : percentOfSegmentsToConsiderPerMove, + useBatchedSegmentSampler == null ? DEFAULT_USE_BATCHED_SEGMENT_SAMPLER : useBatchedSegmentSampler, replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : replicantLifetime, replicationThrottleLimit == null ? DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit, balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : balancerComputeThreads, @@ -721,6 +746,7 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit, maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove, percentOfSegmentsToConsiderPerMove == null ? defaults.getPercentOfSegmentsToConsiderPerMove() : percentOfSegmentsToConsiderPerMove, + useBatchedSegmentSampler == null ? defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler, replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime, replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit, balancerComputeThreads == null ? defaults.getBalancerComputeThreads() : balancerComputeThreads, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java index ac56544b0730..8ff4f6a4a23c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java @@ -35,7 +35,6 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; -import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -210,21 +209,6 @@ static double computeJointSegmentsCost(final DataSegment segment, final Iterable return totalCost; } - - @Override - public BalancerSegmentHolder pickSegmentToMove( - final List serverHolders, - Set broadcastDatasources, - double percentOfSegmentsToConsider - ) - { - return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( - serverHolders, - broadcastDatasources, - percentOfSegmentsToConsider - ); - } - @Override public Iterator pickServersToDrop(DataSegment toDrop, NavigableSet serverHolders) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java index 8f3b96d67ab2..d70d85016c3d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java @@ -26,7 +26,6 @@ import java.util.Iterator; import java.util.List; import java.util.NavigableSet; -import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -53,20 +52,6 @@ public ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment, List return null; //To change body of implemented methods use File | Settings | File Templates. } - @Override - public BalancerSegmentHolder pickSegmentToMove( - List serverHolders, - Set broadcastDatasources, - double percentOfSegmentsToConsider - ) - { - return ReservoirSegmentSampler.getRandomBalancerSegmentHolder( - serverHolders, - broadcastDatasources, - percentOfSegmentsToConsider - ); - } - @Override public Iterator pickServersToDrop(DataSegment toDropSegment, NavigableSet serverHolders) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java index 01b326d87807..218cf696b7ad 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java @@ -56,6 +56,8 @@ public class BalanceSegments implements CoordinatorDuty protected final Map> currentlyMovingSegments = new HashMap<>(); + private static final int DEFAULT_RESERVOIR_SIZE = 1; + public BalanceSegments(DruidCoordinator coordinator) { this.coordinator = coordinator; @@ -190,19 +192,20 @@ private Pair balanceServers( final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue(); int moved = 0, unmoved = 0; - Iterator segmetnsToMove = strategy.pickSegmentsToMove( + Iterator segmentsToMove = strategy.pickSegmentsToMove( toMoveFrom, params.getBroadcastDatasources(), - maxSegmentsToMove + params.getCoordinatorDynamicConfig().useBatchedSegmentSampler() ? maxSegmentsToMove : DEFAULT_RESERVOIR_SIZE, + params.getCoordinatorDynamicConfig().getPercentOfSegmentsToConsiderPerMove() ); //noinspection ForLoopThatDoesntUseLoopVariable for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) { - if (!segmetnsToMove.hasNext()) { + if (!segmentsToMove.hasNext()) { log.info("All servers to move segments from are empty, ending run."); break; } - final BalancerSegmentHolder segmentToMoveHolder = segmetnsToMove.next(); + final BalancerSegmentHolder segmentToMoveHolder = segmentsToMove.next(); // DruidCoordinatorRuntimeParams.getUsedSegments originate from SegmentsMetadataManager, i. e. that's a set of segments // that *should* be loaded. segmentToMoveHolder.getSegment originates from ServerInventoryView, i. e. that may be diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java index 11a04e237542..76b11040aad9 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java @@ -36,7 +36,6 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; @@ -240,7 +239,8 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove() new ServerHolder(druidServer2, peon2, false) ), broadcastDatasources, - 2 + 1, + 100 ) ).andReturn( ImmutableList.of( @@ -249,7 +249,7 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove() ).iterator() ); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt())) .andReturn( ImmutableList.of( new BalancerSegmentHolder(druidServer1, segment1), @@ -316,9 +316,9 @@ public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissi mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt())) .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt())) .andReturn( ImmutableList.of( new BalancerSegmentHolder(druidServer1, segment2), @@ -368,7 +368,7 @@ public void testMoveToDecommissioningServer() mockCoordinator(coordinator); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt())) .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()) .anyTimes(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())).andAnswer(() -> { @@ -403,7 +403,7 @@ public void testMoveFromDecommissioningServer() ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false); BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt())) .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()) .once(); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) @@ -551,7 +551,6 @@ public void testRun2() * out to pickSegmentToMove. This config limits the number of segments that are considered when looking for a segment * to move. */ - @Ignore @Test public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove() { @@ -567,27 +566,29 @@ public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove() // The first call for decommissioning servers EasyMock.expect( - strategy.pickSegmentToMove( + strategy.pickSegmentsToMove( ImmutableList.of(), broadcastDatasources, + 1, 40 ) ) - .andReturn(null); + .andReturn(Collections.emptyIterator()); // The second call for the single non decommissioning server move EasyMock.expect( - strategy.pickSegmentToMove( + strategy.pickSegmentsToMove( ImmutableList.of( new ServerHolder(druidServer3, peon3, false), new ServerHolder(druidServer2, peon2, false), new ServerHolder(druidServer1, peon1, false) ), broadcastDatasources, + 1, 40 ) ) - .andReturn(new BalancerSegmentHolder(druidServer2, segment3)); + .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer2, segment3)).iterator()); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new ServerHolder(druidServer3, peon3)) @@ -617,6 +618,30 @@ public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove() ); } + @Test + public void testUseBatchedSegmentSampler() + { + mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments); + mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyList()); + mockDruidServer(druidServer3, "3", "normal", 0L, 100L, Collections.emptyList()); + mockDruidServer(druidServer4, "4", "normal", 0L, 100L, Collections.emptyList()); + + mockCoordinator(coordinator); + + DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(druidServers, peons) + .withDynamicConfigs( + CoordinatorDynamicConfig.builder() + .withMaxSegmentsToMove(2) + .withUseBatchedSegmentSampler(true) + .build() + ) + .withBroadcastDatasources(broadcastDatasources) + .build(); + + params = new BalanceSegmentsTester(coordinator).run(params); + Assert.assertEquals(2L, params.getCoordinatorStats().getTieredStat("movedCount", "normal")); + } + private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder( List druidServers, List peons @@ -730,20 +755,11 @@ public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, Li public Iterator pickSegmentsToMove( List serverHolders, Set broadcastDatasources, - int numberOfSegments - ) - { - return pickOrder.iterator(); - } - - @Override - public BalancerSegmentHolder pickSegmentToMove( - List serverHolders, - Set broadcastDatasources, + int numberOfSegments, double percentOfSegmentsToConsider ) { - return pickOrder.get(pickCounter.getAndIncrement() % pickOrder.size()); + return pickOrder.iterator(); } @Override @@ -771,12 +787,13 @@ private DruidCoordinatorRuntimeParams setupParamsForDecommissioningMaxPercentOfM new ServerHolder(druidServer2, peon2, true) ), broadcastDatasources, - 1 + 1, + 100 ) ).andReturn( ImmutableList.of(new BalancerSegmentHolder(druidServer2, segment2)).iterator() ); - EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt())) + EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyDouble())) .andReturn(ImmutableList.of(new BalancerSegmentHolder(druidServer1, segment1)).iterator()); EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), EasyMock.anyObject())) .andReturn(new ServerHolder(druidServer3, peon3)) diff --git a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap index 16f03866309a..b799bb5f7809 100644 --- a/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap +++ b/web-console/src/dialogs/coordinator-dynamic-config-dialog/__snapshots__/coordinator-dynamic-config-dialog.spec.tsx.snap @@ -168,6 +168,14 @@ exports[`coordinator dynamic config matches snapshot 1`] = ` "name": "decommissioningMaxPercentOfMaxSegmentsToMove", "type": "number", }, + Object { + "defaultValue": false, + "info": + Boolean flag for whether or not we should use the Reservoir Sampling with a reservoir of size k instead of fixed size 1 to pick segments to move. This option can be enabled to speed up segment balancing process, especially if there are huge number of segments in the cluster or if there are too many segments to move. + , + "name": "useBatchedSegmentSampler", + "type": "boolean", + }, Object { "defaultValue": 100, "info": diff --git a/web-console/src/druid-models/coordinator-dynamic-config.tsx b/web-console/src/druid-models/coordinator-dynamic-config.tsx index 4ee4296032e5..ee519ca49c64 100644 --- a/web-console/src/druid-models/coordinator-dynamic-config.tsx +++ b/web-console/src/druid-models/coordinator-dynamic-config.tsx @@ -194,6 +194,19 @@ export const COORDINATOR_DYNAMIC_CONFIG_FIELDS: Field[ ), }, + { + name: 'useBatchedSegmentSampler', + type: 'boolean', + defaultValue: false, + info: ( + <> + Boolean flag for whether or not we should use the Reservoir Sampling with a reservoir of + size k instead of fixed size 1 to pick segments to move. This option can be enabled to speed + up segment balancing process, especially if there are huge number of segments in the cluster + or if there are too many segments to move. + + ), + }, { name: 'percentOfSegmentsToConsiderPerMove', type: 'number', From b02a9e659846a88c7be375ffb805340c38d1dbf5 Mon Sep 17 00:00:00 2001 From: Han Date: Wed, 7 Jul 2021 18:04:43 +0800 Subject: [PATCH 3/3] rebase --- .../apache/druid/server/http/CoordinatorDynamicConfigTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index e02208274d83..b6776a8d578b 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -389,7 +389,7 @@ public void testUpdate() Assert.assertEquals( current, new CoordinatorDynamicConfig - .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) + .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null) .build(current) ); }