From 384d6de9f2b9998b311073f4d6ae1eadba949e8d Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 23 Mar 2018 17:53:00 -0700 Subject: [PATCH 1/3] fix issue where assign primary assigns segments to all historical servers in cluster --- .../coordinator/SegmentReplicantLookup.java | 45 ++++++++++++++++++- .../server/coordinator/rules/LoadRule.java | 8 ++-- 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java index a713c8d181c1..8dab73426ba3 100644 --- a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java @@ -33,9 +33,12 @@ */ public class SegmentReplicantLookup { + + public static SegmentReplicantLookup make(DruidCluster cluster) { final Table segmentsInCluster = HashBasedTable.create(); + final Table loadingSegments = HashBasedTable.create(); for (SortedSet serversByType : cluster.getSortedHistoricalsByTier()) { for (ServerHolder serverHolder : serversByType) { @@ -48,17 +51,29 @@ public static SegmentReplicantLookup make(DruidCluster cluster) } segmentsInCluster.put(segment.getIdentifier(), server.getTier(), ++numReplicants); } + + // Also account for queued segments + for (DataSegment segment : serverHolder.getPeon().getSegmentsToLoad()) { + Integer numReplicants = loadingSegments.get(segment.getIdentifier(), server.getTier()); + if (numReplicants == null) { + numReplicants = 0; + } + loadingSegments.put(segment.getIdentifier(), server.getTier(), ++numReplicants); + } } } - return new SegmentReplicantLookup(segmentsInCluster); + return new SegmentReplicantLookup(segmentsInCluster, loadingSegments); } private final Table segmentsInCluster; - private SegmentReplicantLookup(Table segmentsInCluster) + private final Table loadingSegments; + + private SegmentReplicantLookup(Table segmentsInCluster, Table loadingSegments) { this.segmentsInCluster = segmentsInCluster; + this.loadingSegments = loadingSegments; } public Map getClusterTiers(String segmentId) @@ -82,4 +97,30 @@ public int getLoadedReplicants(String segmentId, String tier) Integer retVal = segmentsInCluster.get(segmentId, tier); return (retVal == null) ? 0 : retVal; } + + public int getLoadingReplicants(String segmentId, String tier) + { + Integer retVal = loadingSegments.get(segmentId, tier); + return (retVal == null) ? 0 : retVal; + } + + public int getLoadingReplicants(String segmentId) + { + Map allTiers = loadingSegments.row(segmentId); + int retVal = 0; + for (Integer replicants : allTiers.values()) { + retVal += replicants; + } + return retVal; + } + + public int getTotalReplicants(String segmentId) + { + return getLoadedReplicants(segmentId) + getLoadingReplicants(segmentId); + } + + public int getTotalReplicants(String segmentId, String tier) + { + return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier); + } } diff --git a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java index 5af4d822c4b3..dc615f800518 100644 --- a/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/io/druid/server/coordinator/rules/LoadRule.java @@ -95,8 +95,9 @@ private void assign( final CoordinatorStats stats ) { - // if primary replica already exists - if (!currentReplicants.isEmpty()) { + // if primary replica already exists or is loading + final int loading = params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier()); + if (!currentReplicants.isEmpty() || loading > 0) { assignReplicas(params, segment, stats, null); } else { final ServerHolder primaryHolderToLoad = assignPrimary(params, segment); @@ -169,7 +170,6 @@ private ServerHolder assignPrimary( if (targetReplicantsInTier <= 0) { continue; } - final String tier = entry.getKey(); final List holders = getFilteredHolders( @@ -228,7 +228,7 @@ private void assignReplicas( final int numAssigned = assignReplicasForTier( tier, entry.getIntValue(), - currentReplicants.getOrDefault(tier, 0), + params.getSegmentReplicantLookup().getTotalReplicants(segment.getIdentifier(), tier), params, createLoadQueueSizeLimitingPredicate(params), segment From 04969c58e8bb17e769238940195fdf6d84a6228d Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 23 Mar 2018 19:18:23 -0700 Subject: [PATCH 2/3] fix test --- .../server/coordinator/DruidCoordinatorRuleRunnerTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index 0b7c71ba2ce6..ba32af632ccc 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.client.DruidServer; @@ -942,6 +943,8 @@ public void testDropServerActuallyServesSegment() LoadQueuePeon anotherMockPeon = EasyMock.createMock(LoadQueuePeon.class); EasyMock.expect(anotherMockPeon.getLoadQueueSize()).andReturn(10L).atLeastOnce(); + EasyMock.expect(anotherMockPeon.getSegmentsToLoad()).andReturn(Sets.newHashSet()).anyTimes(); + EasyMock.replay(anotherMockPeon); DruidCluster druidCluster = new DruidCluster( From 2d559d40c50ca75b667d6225507aa2a0d1e36e94 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 30 Mar 2018 16:44:03 -0700 Subject: [PATCH 3/3] add test to ensure primary assignment will not assign to another server while loading is in progress --- .../coordinator/SegmentReplicantLookup.java | 2 - .../coordinator/rules/LoadRuleTest.java | 139 ++++++++++++++++++ 2 files changed, 139 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java index 8dab73426ba3..357eeb99be7c 100644 --- a/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/io/druid/server/coordinator/SegmentReplicantLookup.java @@ -33,8 +33,6 @@ */ public class SegmentReplicantLookup { - - public static SegmentReplicantLookup make(DruidCluster cluster) { final Table segmentsInCluster = HashBasedTable.create(); diff --git a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java index 51200461a959..4848503a5738 100644 --- a/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java +++ b/server/src/test/java/io/druid/server/coordinator/rules/LoadRuleTest.java @@ -20,7 +20,9 @@ package io.druid.server.coordinator.rules; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -58,7 +60,9 @@ import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -190,6 +194,127 @@ public void testLoad() EasyMock.verify(throttler, mockPeon, mockBalancerStrategy); } + @Test + public void testLoadPrimaryAssignDoesNotOverAssign() + { + EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes(); + + final LoadQueuePeon mockPeon = createEmptyPeon(); + mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject()); + EasyMock.expectLastCall().atLeastOnce(); + + LoadRule rule = createLoadRule(ImmutableMap.of( + "hot", 1 + )); + + final DataSegment segment = createDataSegment("foo"); + + EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject())) + .andDelegateTo(balancerStrategy) + .anyTimes(); + + EasyMock.replay(throttler, mockPeon, mockBalancerStrategy); + + DruidCluster druidCluster = new DruidCluster( + null, + ImmutableMap.of( + "hot", + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 1 + ).toImmutableDruidServer(), + mockPeon + ), new ServerHolder( + new DruidServer( + "serverHot2", + "hostHot2", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 1 + ).toImmutableDruidServer(), + mockPeon + ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) + ) + ); + + CoordinatorStats stats = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(druidCluster) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster)) + .withReplicationManager(throttler) + .withBalancerStrategy(mockBalancerStrategy) + .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) + .withAvailableSegments(Arrays.asList(segment)).build(), + segment + ); + + + Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + + // ensure multiple runs don't assign primary segment again if at replication count + final LoadQueuePeon loadingPeon = createLoadingPeon(ImmutableList.of(segment)); + EasyMock.replay(loadingPeon); + + DruidCluster afterLoad = new DruidCluster( + null, + ImmutableMap.of( + "hot", + Stream.of( + new ServerHolder( + new DruidServer( + "serverHot", + "hostHot", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 1 + ).toImmutableDruidServer(), + loadingPeon + ), new ServerHolder( + new DruidServer( + "serverHot2", + "hostHot2", + null, + 1000, + ServerType.HISTORICAL, + "hot", + 1 + ).toImmutableDruidServer(), + mockPeon + ) + ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder()))) + ) + ); + CoordinatorStats statsAfterLoadPrimary = rule.run( + null, + DruidCoordinatorRuntimeParams.newBuilder() + .withDruidCluster(afterLoad) + .withSegmentReplicantLookup(SegmentReplicantLookup.make(afterLoad)) + .withReplicationManager(throttler) + .withBalancerStrategy(mockBalancerStrategy) + .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01")) + .withAvailableSegments(Arrays.asList(segment)).build(), + segment + ); + + + Assert.assertEquals(0, statsAfterLoadPrimary.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot")); + + EasyMock.verify(throttler, mockPeon, mockBalancerStrategy); + } + @Test public void testLoadPriority() { @@ -619,4 +744,18 @@ private static LoadQueuePeon createEmptyPeon() return mockPeon; } + + private static LoadQueuePeon createLoadingPeon(List segments) + { + final Set segs = ImmutableSet.copyOf(segments); + final long loadingSize = segs.stream().mapToLong(DataSegment::getSize).sum(); + + final LoadQueuePeon mockPeon = EasyMock.createMock(LoadQueuePeon.class); + EasyMock.expect(mockPeon.getSegmentsToLoad()).andReturn(segs).anyTimes(); + EasyMock.expect(mockPeon.getSegmentsMarkedToDrop()).andReturn(Sets.newHashSet()).anyTimes(); + EasyMock.expect(mockPeon.getLoadQueueSize()).andReturn(loadingSize).anyTimes(); + EasyMock.expect(mockPeon.getNumberOfSegmentsInQueue()).andReturn(segs.size()).anyTimes(); + + return mockPeon; + } }