From 043091e10a98bc3cd2cd6200a49d4c9a796105a3 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sun, 16 Jul 2023 21:22:27 +0530 Subject: [PATCH 1/3] Add test for ReservoirSegmentSampler --- .../balancer/ReservoirSegmentSamplerTest.java | 62 +++++++++++++++++-- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java index 25b180154688..9bf71fd25290 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java @@ -19,7 +19,9 @@ package org.apache.druid.server.coordinator.balancer; +import com.google.common.collect.Lists; import org.apache.druid.client.DruidServer; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CreateDataSegments; @@ -41,6 +43,7 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; public class ReservoirSegmentSamplerTest { @@ -216,21 +219,72 @@ public void testBroadcastSegmentsAreIgnored() } } + @Test + public void testSegmentsFromAllServersAreEquallyLikelyToBePicked() + { + // Create 4 servers, each having an equal number of segments + final List> subSegmentLists = Lists.partition(segments, segments.size() / 4); + final List servers = IntStream.range(0, 4).mapToObj( + i -> createHistorical("server_" + i, subSegmentLists.get(i).toArray(new DataSegment[0])) + ).collect(Collectors.toList()); + + // Get the distribution of picked segments for different sample percentage + final int[] samplePercentages = {50, 20, 10, 5}; + for (int samplePercentage : samplePercentages) { + final int numSegmentsToPick = (int) (segments.size() * samplePercentage / 100.0); + final int[] numSegmentsPickedFromServer = new int[servers.size()]; + int totalSegmentsPicked = 0; + + // Perform a few picking iterations + for (int i = 0; i < 50; ++i) { + List pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom( + servers, + numSegmentsToPick, + ServerHolder::getServedSegments, + Collections.emptySet() + ); + totalSegmentsPicked += pickedSegments.size(); + + // Get the number of segments picked from each server + for (BalancerSegmentHolder pickedSegment : pickedSegments) { + int serverIndex = servers.indexOf(pickedSegment.getServer()); + numSegmentsPickedFromServer[serverIndex]++; + } + } + + // Segments picked from each server are always 24-26% of total + final int expectedMin = (int) (totalSegmentsPicked * 0.24); + final int expectedMax = (int) (totalSegmentsPicked * 0.26); + for (int numPickedFromEachServer : numSegmentsPickedFromServer) { + Assert.assertTrue( + StringUtils.format( + "Mismatch in picked segments for sample percentage [%d], value [%s], expected range [%d-%d]", + samplePercentage, numPickedFromEachServer, expectedMin, expectedMax + ), + numPickedFromEachServer >= expectedMin && numPickedFromEachServer <= expectedMax + ); + } + } + } + @Test(timeout = 60_000) - public void testNumberOfIterationsToCycleThroughAllSegments() + public void testNumberOfSamplingsRequiredToPickAllSegments() { - // The number of runs required for each sample percentage + // The number of sampling iterations required for each sample percentage // remains more or less fixed, even with a larger number of segments final int[] samplePercentages = {100, 50, 10, 5, 1}; final int[] expectedIterations = {1, 20, 100, 200, 1000}; final int[] totalObservedIterations = new int[5]; + + // For every sample percentage, count the minimum number of required samplings for (int i = 0; i < 50; ++i) { for (int j = 0; j < samplePercentages.length; ++j) { - totalObservedIterations[j] += countMinRunsWithSamplePercent(samplePercentages[j]); + totalObservedIterations[j] += countMinRunsToPickAllSegments(samplePercentages[j]); } } + // Compute the avg value from the 50 observations for each sample percentage for (int j = 0; j < samplePercentages.length; ++j) { double avgObservedIterations = totalObservedIterations[j] / 50.0; Assert.assertTrue(avgObservedIterations <= expectedIterations[j]); @@ -244,7 +298,7 @@ public void testNumberOfIterationsToCycleThroughAllSegments() *

* {@code k = sampleSize = totalNumSegments * samplePercentage} */ - private int countMinRunsWithSamplePercent(int samplePercentage) + private int countMinRunsToPickAllSegments(int samplePercentage) { final int numSegments = segments.size(); final List servers = Arrays.asList( From 838c678825bae611c2567c5b7be22f903f70cddc Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 17 Jul 2023 10:19:14 +0530 Subject: [PATCH 2/3] Add one more test --- .../balancer/ReservoirSegmentSamplerTest.java | 106 ++++++++++++++---- 1 file changed, 82 insertions(+), 24 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java index 9bf71fd25290..cde883b81d0d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -228,33 +229,19 @@ public void testSegmentsFromAllServersAreEquallyLikelyToBePicked() i -> createHistorical("server_" + i, subSegmentLists.get(i).toArray(new DataSegment[0])) ).collect(Collectors.toList()); - // Get the distribution of picked segments for different sample percentage + // Get the distribution of picked segments for different sample percentages final int[] samplePercentages = {50, 20, 10, 5}; for (int samplePercentage : samplePercentages) { - final int numSegmentsToPick = (int) (segments.size() * samplePercentage / 100.0); - final int[] numSegmentsPickedFromServer = new int[servers.size()]; - int totalSegmentsPicked = 0; - - // Perform a few picking iterations - for (int i = 0; i < 50; ++i) { - List pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom( - servers, - numSegmentsToPick, - ServerHolder::getServedSegments, - Collections.emptySet() - ); - totalSegmentsPicked += pickedSegments.size(); - - // Get the number of segments picked from each server - for (BalancerSegmentHolder pickedSegment : pickedSegments) { - int serverIndex = servers.indexOf(pickedSegment.getServer()); - numSegmentsPickedFromServer[serverIndex]++; - } - } + final int[] numSegmentsPickedFromServer = pickSegmentsAndGetTotalPickedFromEachServer( + servers, + samplePercentage, + 50 + ); - // Segments picked from each server are always 24-26% of total - final int expectedMin = (int) (totalSegmentsPicked * 0.24); - final int expectedMax = (int) (totalSegmentsPicked * 0.26); + // Segments picked from each server are always 23-27% of total + final int totalSegmentsPicked = Arrays.stream(numSegmentsPickedFromServer).sum(); + final int expectedMin = (int) (totalSegmentsPicked * 0.23); + final int expectedMax = (int) (totalSegmentsPicked * 0.27); for (int numPickedFromEachServer : numSegmentsPickedFromServer) { Assert.assertTrue( StringUtils.format( @@ -267,6 +254,45 @@ public void testSegmentsFromAllServersAreEquallyLikelyToBePicked() } } + @Test + public void testSegmentsFromMorePopulousServerAreMoreLikelyToBePicked() + { + // Create 4 servers, first one having twice as many segments as the rest + final List> subSegmentLists = Lists.partition(segments, segments.size() / 5); + + final List servers = new ArrayList<>(); + List segmentsForServer0 = new ArrayList<>(subSegmentLists.get(0)); + segmentsForServer0.addAll(subSegmentLists.get(1)); + servers.add(createHistorical("server_" + 0, segmentsForServer0)); + + IntStream.range(1, 4).mapToObj( + i -> createHistorical("server_" + i, subSegmentLists.get(i + 1)) + ).forEach(servers::add); + + final int[] samplePercentages = {50, 20, 10, 5}; + for (int samplePercentage : samplePercentages) { + final int[] numSegmentsPickedFromServer = pickSegmentsAndGetTotalPickedFromEachServer( + servers, + samplePercentage, + 50 + ); + + // Segments picked from server0 are always 38-42% of total + final int totalSegmentsPicked = Arrays.stream(numSegmentsPickedFromServer).sum(); + final int expectedMin = (int) (totalSegmentsPicked * 0.38); + final int expectedMax = (int) (totalSegmentsPicked * 0.42); + + int numPickedFromServer0 = numSegmentsPickedFromServer[0]; + Assert.assertTrue( + StringUtils.format( + "Mismatch in picked segments for sample percentage [%d], value [%s], expected range [%d-%d]", + samplePercentage, numPickedFromServer0, expectedMin, expectedMax + ), + numPickedFromServer0 >= expectedMin && numPickedFromServer0 <= expectedMax + ); + } + } + @Test(timeout = 60_000) public void testNumberOfSamplingsRequiredToPickAllSegments() { @@ -324,6 +350,38 @@ private int countMinRunsToPickAllSegments(int samplePercentage) return numIterations; } + private int[] pickSegmentsAndGetTotalPickedFromEachServer( + List servers, + int samplePercentage, + int numIterations + ) + { + final int numSegmentsToPick = (int) (segments.size() * samplePercentage / 100.0); + final int[] numSegmentsPickedFromServer = new int[servers.size()]; + + for (int i = 0; i < numIterations; ++i) { + List pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom( + servers, + numSegmentsToPick, + ServerHolder::getServedSegments, + Collections.emptySet() + ); + + // Get the number of segments picked from each server + for (BalancerSegmentHolder pickedSegment : pickedSegments) { + int serverIndex = servers.indexOf(pickedSegment.getServer()); + numSegmentsPickedFromServer[serverIndex]++; + } + } + + return numSegmentsPickedFromServer; + } + + private ServerHolder createHistorical(String serverName, List loadedSegments) + { + return createHistorical(serverName, loadedSegments.toArray(new DataSegment[0])); + } + private ServerHolder createHistorical(String serverName, DataSegment... loadedSegments) { final DruidServer server = From 3fa8e236c5ca6c2d82216761a845e41d01e452a0 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 17 Jul 2023 15:06:26 +0530 Subject: [PATCH 3/3] Clean up and add comments --- .../balancer/ReservoirSegmentSamplerTest.java | 87 +++++++++---------- 1 file changed, 39 insertions(+), 48 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java index cde883b81d0d..e6387a5a3438 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/balancer/ReservoirSegmentSamplerTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.Lists; import org.apache.druid.client.DruidServer; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CreateDataSegments; @@ -35,14 +34,12 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -59,9 +56,6 @@ public class ReservoirSegmentSamplerTest .withNumPartitions(10) .eachOfSizeInMb(100); - private final Function> GET_SERVED_SEGMENTS - = serverHolder -> serverHolder.getServer().iterateAllSegments(); - @Before public void setUp() { @@ -84,7 +78,7 @@ public void testEverySegmentGetsPickedAtleastOnce() // due to the pseudo-randomness of this method, we may not select a segment every single time no matter what. segmentCountMap.compute( ReservoirSegmentSampler - .pickMovableSegmentsFrom(servers, 1, GET_SERVED_SEGMENTS, Collections.emptySet()) + .pickMovableSegmentsFrom(servers, 1, ServerHolder::getServedSegments, Collections.emptySet()) .get(0).getSegment(), (segment, count) -> count == null ? 1 : count + 1 ); @@ -155,9 +149,16 @@ public void testPickLoadingOrLoadedSegments() Assert.assertTrue(pickedSegments.containsAll(loadingSegments)); // Pick only loaded segments - pickedSegments = ReservoirSegmentSampler - .pickMovableSegmentsFrom(Arrays.asList(server1, server2), 10, GET_SERVED_SEGMENTS, Collections.emptySet()) - .stream().map(BalancerSegmentHolder::getSegment).collect(Collectors.toSet()); + List pickedHolders = ReservoirSegmentSampler.pickMovableSegmentsFrom( + Arrays.asList(server1, server2), + 10, + ServerHolder::getServedSegments, + Collections.emptySet() + ); + pickedSegments = pickedHolders + .stream() + .map(BalancerSegmentHolder::getSegment) + .collect(Collectors.toSet()); // Verify that only loaded segments are picked Assert.assertEquals(loadedSegments.size(), pickedSegments.size()); @@ -181,7 +182,7 @@ public void testSegmentsOnBrokersAreIgnored() List pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom( Arrays.asList(historical, broker), 10, - GET_SERVED_SEGMENTS, + ServerHolder::getServedSegments, Collections.emptySet() ); @@ -210,8 +211,12 @@ public void testBroadcastSegmentsAreIgnored() ); // Try to pick all the segments on the servers - List pickedSegments = ReservoirSegmentSampler - .pickMovableSegmentsFrom(servers, 10, GET_SERVED_SEGMENTS, Collections.singleton(broadcastDatasource)); + List pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom( + servers, + 10, + ServerHolder::getServedSegments, + Collections.singleton(broadcastDatasource) + ); // Verify that none of the broadcast segments are picked Assert.assertEquals(2, pickedSegments.size()); @@ -232,24 +237,16 @@ public void testSegmentsFromAllServersAreEquallyLikelyToBePicked() // Get the distribution of picked segments for different sample percentages final int[] samplePercentages = {50, 20, 10, 5}; for (int samplePercentage : samplePercentages) { - final int[] numSegmentsPickedFromServer = pickSegmentsAndGetTotalPickedFromEachServer( - servers, - samplePercentage, - 50 - ); + final int[] numSegmentsPickedFromServer + = pickSegmentsAndGetPickedCountPerServer(servers, samplePercentage, 50); - // Segments picked from each server are always 23-27% of total final int totalSegmentsPicked = Arrays.stream(numSegmentsPickedFromServer).sum(); - final int expectedMin = (int) (totalSegmentsPicked * 0.23); - final int expectedMax = (int) (totalSegmentsPicked * 0.27); - for (int numPickedFromEachServer : numSegmentsPickedFromServer) { - Assert.assertTrue( - StringUtils.format( - "Mismatch in picked segments for sample percentage [%d], value [%s], expected range [%d-%d]", - samplePercentage, numPickedFromEachServer, expectedMin, expectedMax - ), - numPickedFromEachServer >= expectedMin && numPickedFromEachServer <= expectedMax - ); + + // Number of segments picked from each server is ~25% of total + final double expectedPickedSegments = totalSegmentsPicked * 0.25; + final double error = totalSegmentsPicked * 0.02; + for (int pickedSegments : numSegmentsPickedFromServer) { + Assert.assertEquals(expectedPickedSegments, pickedSegments, error); } } } @@ -271,25 +268,19 @@ public void testSegmentsFromMorePopulousServerAreMoreLikelyToBePicked() final int[] samplePercentages = {50, 20, 10, 5}; for (int samplePercentage : samplePercentages) { - final int[] numSegmentsPickedFromServer = pickSegmentsAndGetTotalPickedFromEachServer( - servers, - samplePercentage, - 50 - ); + final int[] numSegmentsPickedFromServer + = pickSegmentsAndGetPickedCountPerServer(servers, samplePercentage, 50); - // Segments picked from server0 are always 38-42% of total final int totalSegmentsPicked = Arrays.stream(numSegmentsPickedFromServer).sum(); - final int expectedMin = (int) (totalSegmentsPicked * 0.38); - final int expectedMax = (int) (totalSegmentsPicked * 0.42); - - int numPickedFromServer0 = numSegmentsPickedFromServer[0]; - Assert.assertTrue( - StringUtils.format( - "Mismatch in picked segments for sample percentage [%d], value [%s], expected range [%d-%d]", - samplePercentage, numPickedFromServer0, expectedMin, expectedMax - ), - numPickedFromServer0 >= expectedMin && numPickedFromServer0 <= expectedMax - ); + + // Number of segments picked from server0 are ~40% of total and + // number of segments picked from other servers are each ~20% of total + double error = totalSegmentsPicked * 0.02; + Assert.assertEquals(totalSegmentsPicked * 0.40, numSegmentsPickedFromServer[0], error); + + for (int serverId = 1; serverId < servers.size(); ++serverId) { + Assert.assertEquals(totalSegmentsPicked * 0.20, numSegmentsPickedFromServer[serverId], error); + } } } @@ -339,7 +330,7 @@ private int countMinRunsToPickAllSegments(int samplePercentage) int numIterations = 1; for (; numIterations < 10000; ++numIterations) { ReservoirSegmentSampler - .pickMovableSegmentsFrom(servers, sampleSize, GET_SERVED_SEGMENTS, Collections.emptySet()) + .pickMovableSegmentsFrom(servers, sampleSize, ServerHolder::getServedSegments, Collections.emptySet()) .forEach(holder -> pickedSegments.add(holder.getSegment())); if (pickedSegments.size() >= numSegments) { @@ -350,7 +341,7 @@ private int countMinRunsToPickAllSegments(int samplePercentage) return numIterations; } - private int[] pickSegmentsAndGetTotalPickedFromEachServer( + private int[] pickSegmentsAndGetPickedCountPerServer( List servers, int samplePercentage, int numIterations