From b27535755dff667765591e4382442157e7b717bb Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Mon, 22 Jun 2020 15:21:53 +0530 Subject: [PATCH 1/6] fix server overassignment --- .../coordinator/CostBalancerStrategy.java | 9 +- .../coordinator/RandomBalancerStrategy.java | 11 ++- .../coordinator/BalancerStrategyTest.java | 99 +++++++++++++++++++ 3 files changed, 114 insertions(+), 5 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java index 5d656d643f99..e5e3cb57b129 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java @@ -367,7 +367,8 @@ protected Pair chooseBestServer( final boolean includeCurrentServer ) { - Pair bestServer = Pair.of(Double.POSITIVE_INFINITY, null); + final Pair noServer = Pair.of(Double.POSITIVE_INFINITY, null); + Pair bestServer = noServer; List>> futures = new ArrayList<>(); @@ -391,7 +392,11 @@ protected Pair chooseBestServer( bestServers.add(server); } } - + // If the best server list contains server whose cost of serving the segment is INFINITE then this means + // no usable servers are found so return a null server so that segment assignment does not happen + if (bestServers.get(0).lhs.isInfinite()) { + return noServer; + } // Randomly choose a server from the best servers bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size())); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java index 72fdedf6e453..90590021527c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java @@ -28,18 +28,23 @@ import java.util.NavigableSet; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; public class RandomBalancerStrategy implements BalancerStrategy { @Override public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List serverHolders) { - if (serverHolders.size() == 1) { + // filter out servers whose avaialable size is less than required to serve this segment + final List usableServerHolders = serverHolders.stream().filter( + serverHolder -> serverHolder.getAvailableSize() >= proposalSegment.getSize() + ).collect(Collectors.toList()); + if (usableServerHolders.size() <= 1) { return null; } else { - ServerHolder holder = serverHolders.get(ThreadLocalRandom.current().nextInt(serverHolders.size())); + ServerHolder holder = usableServerHolders.get(ThreadLocalRandom.current().nextInt(usableServerHolders.size())); while (holder.isServingSegment(proposalSegment)) { - holder = serverHolders.get(ThreadLocalRandom.current().nextInt(serverHolders.size())); + holder = usableServerHolders.get(ThreadLocalRandom.current().nextInt(usableServerHolders.size())); } return holder; } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java new file mode 100644 index 000000000000..48c019f0d873 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.server.coordination.ServerType; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NoneShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +@RunWith(Parameterized.class) +public class BalancerStrategyTest +{ + private final BalancerStrategy balancerStrategy; + private DataSegment proposedDataSegment; + private List serverHolders; + + @Parameterized.Parameters(name = "{index}: BalancerStrategy:{0}") + public static Iterable data() + { + return Arrays.asList( + new Object[][]{ + {new CostBalancerStrategy(MoreExecutors.sameThreadExecutor())}, + {new RandomBalancerStrategy()} + } + ); + } + + public BalancerStrategyTest(BalancerStrategy balancerStrategy) + { + this.balancerStrategy = balancerStrategy; + } + + @Before + public void setUp() throws Exception + { + this.proposedDataSegment = new DataSegment( + "datasource1", + new Interval(0, 1), + "", + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + 0, + 11L + ); + final ServerHolder serverHolder = new ServerHolder(new ImmutableDruidServer( + new DruidServerMetadata( + "server1", + "localhost:8081", + null, + 10L, + ServerType.HISTORICAL, + "_default_tier", + 0 + ), 0L, ImmutableMap.of(), 0), new LoadQueuePeonTester()); + serverHolders = new ArrayList<>(); + serverHolders.add(serverHolder); + } + + @Test + public void findNewSegmentHomeReplicatorNotEnoughSpace() + { + final ServerHolder serverHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders); + // since there is not enough space on server having avaialable size 10L to host a segment of size 11L, it should be null + Assert.assertNull(serverHolder); + } +} From 9bd42e98754c9f82bf46bc18256018ec9e64b285 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Wed, 24 Jun 2020 18:23:16 +0530 Subject: [PATCH 2/6] fix random balancer strategy, add more tests --- .../coordinator/RandomBalancerStrategy.java | 14 +- .../coordinator/BalancerStrategyTest.java | 61 +++-- .../server/coordinator/RunRulesTest.java | 208 +++++++++++++++++- 3 files changed, 254 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java index 90590021527c..491c162c8754 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java @@ -35,18 +35,16 @@ public class RandomBalancerStrategy implements BalancerStrategy @Override public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List serverHolders) { - // filter out servers whose avaialable size is less than required to serve this segment + // filter out servers whose avaialable size is less than required to serve this segment and those who are already + // serving this segment final List usableServerHolders = serverHolders.stream().filter( - serverHolder -> serverHolder.getAvailableSize() >= proposalSegment.getSize() + serverHolder -> serverHolder.getAvailableSize() >= proposalSegment.getSize() && !serverHolder.isServingSegment( + proposalSegment) ).collect(Collectors.toList()); - if (usableServerHolders.size() <= 1) { + if (usableServerHolders.size() == 0) { return null; } else { - ServerHolder holder = usableServerHolders.get(ThreadLocalRandom.current().nextInt(usableServerHolders.size())); - while (holder.isServingSegment(proposalSegment)) { - holder = usableServerHolders.get(ThreadLocalRandom.current().nextInt(usableServerHolders.size())); - } - return holder; + return usableServerHolders.get(ThreadLocalRandom.current().nextInt(usableServerHolders.size())); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java index 48c019f0d873..76c8abcbbae3 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java @@ -19,10 +19,8 @@ package org.apache.druid.server.coordinator; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; -import org.apache.druid.client.ImmutableDruidServer; -import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.client.DruidServer; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; @@ -62,7 +60,7 @@ public BalancerStrategyTest(BalancerStrategy balancerStrategy) } @Before - public void setUp() throws Exception + public void setUp() { this.proposedDataSegment = new DataSegment( "datasource1", @@ -75,25 +73,52 @@ public void setUp() throws Exception 0, 11L ); - final ServerHolder serverHolder = new ServerHolder(new ImmutableDruidServer( - new DruidServerMetadata( - "server1", - "localhost:8081", - null, - 10L, - ServerType.HISTORICAL, - "_default_tier", - 0 - ), 0L, ImmutableMap.of(), 0), new LoadQueuePeonTester()); - serverHolders = new ArrayList<>(); - serverHolders.add(serverHolder); } + @Test public void findNewSegmentHomeReplicatorNotEnoughSpace() { - final ServerHolder serverHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders); + final ServerHolder serverHolder = new ServerHolder( + new DruidServer("server1", "host1", null, 10L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(), + new LoadQueuePeonTester()); + serverHolders = new ArrayList<>(); + serverHolders.add(serverHolder); + final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders); // since there is not enough space on server having avaialable size 10L to host a segment of size 11L, it should be null - Assert.assertNull(serverHolder); + Assert.assertNull(foundServerHolder); + } + + @Test(timeout = 5000L) + public void findNewSegmentHomeReplicatorNotEnoughNodesForReplication() + { + final ServerHolder serverHolder1 = new ServerHolder( + new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(), + new LoadQueuePeonTester()); + + final ServerHolder serverHolder2 = new ServerHolder( + new DruidServer("server2", "host2", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(), + new LoadQueuePeonTester()); + + serverHolders = new ArrayList<>(); + serverHolders.add(serverHolder1); + serverHolders.add(serverHolder2); + + final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders); + // since there is not enough nodes to load 3 replicas of segment + Assert.assertNull(foundServerHolder); + } + + @Test + public void findNewSegmentHomeReplicatorEnoughSpace() + { + final ServerHolder serverHolder = new ServerHolder( + new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).toImmutableDruidServer(), + new LoadQueuePeonTester()); + serverHolders = new ArrayList<>(); + serverHolders.add(serverHolder); + final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders); + // since there is enough space on server it should be selected + Assert.assertEquals(serverHolder, foundServerHolder); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index 96f38c3a8521..e6a068e0c818 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -37,6 +37,7 @@ import org.apache.druid.server.coordinator.rules.ForeverLoadRule; import org.apache.druid.server.coordinator.rules.IntervalDropRule; import org.apache.druid.server.coordinator.rules.IntervalLoadRule; +import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; @@ -109,6 +110,193 @@ public void tearDown() EasyMock.verify(databaseRuleManager); } + /** + * Tier - __default_tier + * Nodes - 2 + * Replicants - 3 + * Random balancer strategy should not assign anything and not get into loop as there are not enough nodes for replication + */ + @Test(timeout = 5000L) + public void testTwoNodesOneTierThreeReplicantsRandomStrategyNotEnoughNodes() + { + mockCoordinator(); + mockEmptyPeon(); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of(DruidServer.DEFAULT_TIER, 3) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DataSegment dataSegment = new DataSegment( + "test", + new Interval(0, 1), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 1 + ); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder( + new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment) + .toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + new DruidServer("server2", "host2", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + + Assert.assertEquals(0L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); + + EasyMock.verify(mockPeon); + } + + + /** + * Tier - __default_tier + * Nodes - 1 + * Replicants - 1 + * Random balancer strategy should select the only node + */ + @Test(timeout = 5000L) + public void testOneNodesOneTierOneReplicantRandomStrategyEnoughSpace() + { + mockCoordinator(); + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + mockEmptyPeon(); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of(DruidServer.DEFAULT_TIER, 1) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DataSegment dataSegment = new DataSegment( + "test", + new Interval(0, 1), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 1 + ); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder( + new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + Assert.assertEquals(1L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); + + EasyMock.verify(mockPeon); + } + + /** + * Tier - __default_tier + * Nodes - 1 + * Replicants - 1 + * Random balancer strategy should not assign anything as there is not enough space + */ + @Test(timeout = 5000L) + public void testOneNodesOneTierOneReplicantRandomStrategyNotEnoughSpace() + { + mockCoordinator(); + mockEmptyPeon(); + int numReplicants = 1; + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DataSegment dataSegment = new DataSegment( + "test", + new Interval(0, 1), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 11 + ); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder( + new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment failed + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); + + EasyMock.verify(mockPeon); + } + /** * Nodes: * hot - 1 replicant @@ -193,17 +381,31 @@ private DruidCoordinatorRuntimeParams.Builder makeCoordinatorRuntimeParams( BalancerStrategy balancerStrategy ) { - return createCoordinatorRuntimeParams(druidCluster) - .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster())) + return makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, usedSegments); + } + + private DruidCoordinatorRuntimeParams.Builder makeCoordinatorRuntimeParams( + DruidCluster druidCluster, + BalancerStrategy balancerStrategy, + List dataSegments + ) + { + return createCoordinatorRuntimeParams(druidCluster, dataSegments) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) .withBalancerStrategy(balancerStrategy); } private DruidCoordinatorRuntimeParams.Builder createCoordinatorRuntimeParams(DruidCluster druidCluster) + { + return createCoordinatorRuntimeParams(druidCluster, usedSegments); + } + + private DruidCoordinatorRuntimeParams.Builder createCoordinatorRuntimeParams(DruidCluster druidCluster, List dataSegments) { return CoordinatorRuntimeParamsTestHelpers .newBuilder() .withDruidCluster(druidCluster) - .withUsedSegmentsInTest(usedSegments) + .withUsedSegmentsInTest(dataSegments) .withDatabaseRuleManager(databaseRuleManager); } From c731ae946ca1bfff229402db8245c723c5a022c7 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Wed, 24 Jun 2020 18:25:02 +0530 Subject: [PATCH 3/6] comment --- .../druid/server/coordinator/RandomBalancerStrategy.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java index 491c162c8754..de3e46e66f43 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java @@ -35,8 +35,7 @@ public class RandomBalancerStrategy implements BalancerStrategy @Override public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List serverHolders) { - // filter out servers whose avaialable size is less than required to serve this segment and those who are already - // serving this segment + // filter out servers whose avaialable size is less than required for this segment and those already serving this segment final List usableServerHolders = serverHolders.stream().filter( serverHolder -> serverHolder.getAvailableSize() >= proposalSegment.getSize() && !serverHolder.isServingSegment( proposalSegment) From 1b6492dc2ac5f93de0ef794cfdba13f4286f4497 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Wed, 24 Jun 2020 23:33:01 +0530 Subject: [PATCH 4/6] added more tests --- .../server/coordinator/RunRulesTest.java | 436 ++++++++++-------- 1 file changed, 249 insertions(+), 187 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index e6a068e0c818..10922a467610 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -110,193 +110,6 @@ public void tearDown() EasyMock.verify(databaseRuleManager); } - /** - * Tier - __default_tier - * Nodes - 2 - * Replicants - 3 - * Random balancer strategy should not assign anything and not get into loop as there are not enough nodes for replication - */ - @Test(timeout = 5000L) - public void testTwoNodesOneTierThreeReplicantsRandomStrategyNotEnoughNodes() - { - mockCoordinator(); - mockEmptyPeon(); - - EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( - Collections.singletonList( - new ForeverLoadRule( - ImmutableMap.of(DruidServer.DEFAULT_TIER, 3) - ) - )).atLeastOnce(); - EasyMock.replay(databaseRuleManager); - - DataSegment dataSegment = new DataSegment( - "test", - new Interval(0, 1), - DateTimes.nowUtc().toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - IndexIO.CURRENT_VERSION_ID, - 1 - ); - - DruidCluster druidCluster = DruidClusterBuilder - .newBuilder() - .addTier( - DruidServer.DEFAULT_TIER, - new ServerHolder( - new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment) - .toImmutableDruidServer(), - mockPeon - ), - new ServerHolder( - new DruidServer("server2", "host2", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment) - .toImmutableDruidServer(), - mockPeon - ) - ) - .build(); - - RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); - - DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) - .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) - .build(); - - DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); - CoordinatorStats stats = afterParams.getCoordinatorStats(); - - Assert.assertEquals(0L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER)); - Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); - Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); - - EasyMock.verify(mockPeon); - } - - - /** - * Tier - __default_tier - * Nodes - 1 - * Replicants - 1 - * Random balancer strategy should select the only node - */ - @Test(timeout = 5000L) - public void testOneNodesOneTierOneReplicantRandomStrategyEnoughSpace() - { - mockCoordinator(); - mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().atLeastOnce(); - mockEmptyPeon(); - - EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( - Collections.singletonList( - new ForeverLoadRule( - ImmutableMap.of(DruidServer.DEFAULT_TIER, 1) - ) - )).atLeastOnce(); - EasyMock.replay(databaseRuleManager); - - DataSegment dataSegment = new DataSegment( - "test", - new Interval(0, 1), - DateTimes.nowUtc().toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - IndexIO.CURRENT_VERSION_ID, - 1 - ); - - DruidCluster druidCluster = DruidClusterBuilder - .newBuilder() - .addTier( - DruidServer.DEFAULT_TIER, - new ServerHolder( - new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) - .toImmutableDruidServer(), - mockPeon - ) - ) - .build(); - - RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); - - DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) - .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) - .build(); - - DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); - CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertEquals(1L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER)); - Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); - Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); - - EasyMock.verify(mockPeon); - } - - /** - * Tier - __default_tier - * Nodes - 1 - * Replicants - 1 - * Random balancer strategy should not assign anything as there is not enough space - */ - @Test(timeout = 5000L) - public void testOneNodesOneTierOneReplicantRandomStrategyNotEnoughSpace() - { - mockCoordinator(); - mockEmptyPeon(); - int numReplicants = 1; - EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( - Collections.singletonList( - new ForeverLoadRule( - ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants) - ) - )).atLeastOnce(); - EasyMock.replay(databaseRuleManager); - - DataSegment dataSegment = new DataSegment( - "test", - new Interval(0, 1), - DateTimes.nowUtc().toString(), - new HashMap<>(), - new ArrayList<>(), - new ArrayList<>(), - NoneShardSpec.instance(), - IndexIO.CURRENT_VERSION_ID, - 11 - ); - - DruidCluster druidCluster = DruidClusterBuilder - .newBuilder() - .addTier( - DruidServer.DEFAULT_TIER, - new ServerHolder( - new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) - .toImmutableDruidServer(), - mockPeon - ) - ) - .build(); - - RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); - - DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) - .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) - .build(); - - DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); - CoordinatorStats stats = afterParams.getCoordinatorStats(); - Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER)); - Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment failed - Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); - Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); - - EasyMock.verify(mockPeon); - } - /** * Nodes: * hot - 1 replicant @@ -1269,6 +1082,255 @@ public void testRulesRunOnNonOvershadowedSegmentsOnly() exec.shutdown(); } + /** + * Tier - __default_tier + * Nodes - 2 + * Replicants - 3 + * Random balancer strategy should not assign anything and not get into loop as there are not enough nodes for replication + */ + @Test(timeout = 5000L) + public void testTwoNodesOneTierThreeReplicantsRandomStrategyNotEnoughNodes() + { + mockCoordinator(); + mockEmptyPeon(); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of(DruidServer.DEFAULT_TIER, 3) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DataSegment dataSegment = new DataSegment( + "test", + new Interval(0, 1), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 1 + ); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder( + new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment) + .toImmutableDruidServer(), + mockPeon + ), + new ServerHolder( + new DruidServer("server2", "host2", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + + Assert.assertEquals(0L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); + + EasyMock.verify(mockPeon); + } + + + /** + * Tier - __default_tier + * Nodes - 1 + * Replicants - 1 + * Random balancer strategy should select the only node + */ + @Test(timeout = 5000L) + public void testOneNodesOneTierOneReplicantRandomStrategyEnoughSpace() + { + mockCoordinator(); + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + mockEmptyPeon(); + + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of(DruidServer.DEFAULT_TIER, 1) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DataSegment dataSegment = new DataSegment( + "test", + new Interval(0, 1), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 1 + ); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder( + new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + Assert.assertEquals(1L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); + + EasyMock.verify(mockPeon); + } + + /** + * Tier - __default_tier + * Nodes - 1 + * Replicants - 1 + * Random balancer strategy should not assign anything as there is not enough space + */ + @Test(timeout = 5000L) + public void testOneNodesOneTierOneReplicantRandomStrategyNotEnoughSpace() + { + mockCoordinator(); + mockEmptyPeon(); + int numReplicants = 1; + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DataSegment dataSegment = new DataSegment( + "test", + new Interval(0, 1), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 11 + ); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder( + new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy(); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment failed + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); + + EasyMock.verify(mockPeon); + } + + /** + * Tier - __default_tier + * Nodes - 1 + * Replicants - 1 + * Cost balancer strategy should not assign anything as there is not enough space + */ + @Test + public void testOneNodesOneTierOneReplicantCostBalancerStrategyNotEnoughSpace() + { + mockCoordinator(); + mockEmptyPeon(); + int numReplicants = 1; + EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn( + Collections.singletonList( + new ForeverLoadRule( + ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants) + ) + )).atLeastOnce(); + EasyMock.replay(databaseRuleManager); + + DataSegment dataSegment = new DataSegment( + "test", + new Interval(0, 1), + DateTimes.nowUtc().toString(), + new HashMap<>(), + new ArrayList<>(), + new ArrayList<>(), + NoneShardSpec.instance(), + IndexIO.CURRENT_VERSION_ID, + 11 + ); + + DruidCluster druidCluster = DruidClusterBuilder + .newBuilder() + .addTier( + DruidServer.DEFAULT_TIER, + new ServerHolder( + new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0) + .toImmutableDruidServer(), + mockPeon + ) + ) + .build(); + + ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1)); + CostBalancerStrategy balancerStrategy = new CostBalancerStrategy(exec); + + DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment)) + .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build()) + .build(); + + DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params); + CoordinatorStats stats = afterParams.getCoordinatorStats(); + Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER)); + Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment should fail + Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty()); + Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty()); + + exec.shutdown(); + EasyMock.verify(mockPeon); + } + private void mockCoordinator() { EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes(); From 819e8bcc04a4e1c2cd930ae9e529950e495e6883 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Thu, 25 Jun 2020 01:21:40 +0530 Subject: [PATCH 5/6] fix forbidden apis --- .../druid/server/coordinator/BalancerStrategyTest.java | 8 ++++---- .../org/apache/druid/server/coordinator/RunRulesTest.java | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java index 76c8abcbbae3..127af6d37f5f 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java @@ -19,12 +19,12 @@ package org.apache.druid.server.coordinator; -import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.client.DruidServer; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; -import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -48,7 +48,7 @@ public static Iterable data() { return Arrays.asList( new Object[][]{ - {new CostBalancerStrategy(MoreExecutors.sameThreadExecutor())}, + {new CostBalancerStrategy(Execs.directExecutor())}, {new RandomBalancerStrategy()} } ); @@ -64,7 +64,7 @@ public void setUp() { this.proposedDataSegment = new DataSegment( "datasource1", - new Interval(0, 1), + Intervals.utc(0, 1), "", new HashMap<>(), new ArrayList<>(), diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java index 10922a467610..e138111c76ee 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java @@ -1104,7 +1104,7 @@ public void testTwoNodesOneTierThreeReplicantsRandomStrategyNotEnoughNodes() DataSegment dataSegment = new DataSegment( "test", - new Interval(0, 1), + Intervals.utc(0, 1), DateTimes.nowUtc().toString(), new HashMap<>(), new ArrayList<>(), @@ -1172,7 +1172,7 @@ public void testOneNodesOneTierOneReplicantRandomStrategyEnoughSpace() DataSegment dataSegment = new DataSegment( "test", - new Interval(0, 1), + Intervals.utc(0, 1), DateTimes.nowUtc().toString(), new HashMap<>(), new ArrayList<>(), @@ -1231,7 +1231,7 @@ public void testOneNodesOneTierOneReplicantRandomStrategyNotEnoughSpace() DataSegment dataSegment = new DataSegment( "test", - new Interval(0, 1), + Intervals.utc(0, 1), DateTimes.nowUtc().toString(), new HashMap<>(), new ArrayList<>(), @@ -1291,7 +1291,7 @@ public void testOneNodesOneTierOneReplicantCostBalancerStrategyNotEnoughSpace() DataSegment dataSegment = new DataSegment( "test", - new Interval(0, 1), + Intervals.utc(0, 1), DateTimes.nowUtc().toString(), new HashMap<>(), new ArrayList<>(), From a0b0df3200087fed0a3b2544523e78f4ad87f412 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Thu, 25 Jun 2020 15:25:23 +0530 Subject: [PATCH 6/6] fix typo --- .../apache/druid/server/coordinator/BalancerStrategyTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java index 127af6d37f5f..b4d3ac5d11dc 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java @@ -85,7 +85,7 @@ public void findNewSegmentHomeReplicatorNotEnoughSpace() serverHolders = new ArrayList<>(); serverHolders.add(serverHolder); final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders); - // since there is not enough space on server having avaialable size 10L to host a segment of size 11L, it should be null + // since there is not enough space on server having available size 10L to host a segment of size 11L, it should be null Assert.assertNull(foundServerHolder); }