From 34c7dbf24f3b23dd9d5051fa840200d5ebf02955 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 18 Jun 2020 16:13:04 -1000 Subject: [PATCH 1/4] Add safeguard to make sure new Rules added are aware of Rule usuage in loadstatus API --- .../druid/server/coordinator/DruidCoordinator.java | 11 ++++++----- .../coordinator/rules/BroadcastDistributionRule.java | 6 ++++++ .../druid/server/coordinator/rules/DropRule.java | 6 ++++++ .../druid/server/coordinator/rules/LoadRule.java | 6 ++++++ .../apache/druid/server/coordinator/rules/Rule.java | 8 ++++++++ 5 files changed, 32 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 0dcf636b7442..91df5de08612 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -293,7 +293,7 @@ public Map> computeUnderReplicationCountsPerDataS final List rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource()); for (final Rule rule : rules) { - if (!rule.appliesTo(segment, now)) { + if (!rule.matchLoadStatusCount() || !rule.appliesTo(segment, now)) { continue; } @@ -307,9 +307,7 @@ public Map> computeUnderReplicationCountsPerDataS ((Object2LongOpenHashMap) underReplicationPerDataSource) .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); }); - } - - if (rule instanceof BroadcastDistributionRule) { + } else if (rule instanceof BroadcastDistributionRule) { for (ImmutableDruidServer server : broadcastTargetServers) { Object2LongMap underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier .computeIfAbsent(server.getTier(), ignored -> new Object2LongOpenHashMap<>()); @@ -321,9 +319,12 @@ public Map> computeUnderReplicationCountsPerDataS underReplicationPerDataSource.putIfAbsent(segment.getDataSource(), 0); } } + } else { + log.error("Rule class [%s] returns matchLoadStatusCount=true but did not implement compute logic", rule.getClass()); } - // only the first matching rule applies + // Only the first matching rule applies. This is because the Coordinator cycle through all used segments + // and match each segment with the first rule that applies. Each segment may only match a single rule. break; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java index 35ff39ea6505..a88e49064df3 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java @@ -71,6 +71,12 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim .accumulate(drop(dropServerHolders, segment)); } + @Override + public boolean matchLoadStatusCount() + { + return true; + } + private CoordinatorStats assign( final Set serverHolders, final DataSegment segment diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java index c565df9b58be..f3c18d6fba75 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java @@ -37,4 +37,10 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim stats.addToGlobalStat("deletedCount", 1); return stats; } + + @Override + public boolean matchLoadStatusCount() + { + return false; + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java index 80912544a4e9..a85182226d59 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java @@ -90,6 +90,12 @@ public CoordinatorStats run( } } + @Override + public boolean matchLoadStatusCount() + { + return true; + } + /** * @param stats {@link CoordinatorStats} to accumulate assignment statistics. */ diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java index d475f67d4c1b..71754f8179c9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java @@ -51,6 +51,14 @@ public interface Rule boolean appliesTo(Interval interval, DateTime referenceTimestamp); + /** + * Returns whether this Rules should be matched and considered in loadstatus API. + * In general, Rules that load segment onto any Druid node should return true. + * Any Rule that returns true for this method should add a compute logic (as if case condition) for the particular + * Rule class in {@link DruidCoordinator#computeUnderReplicationCountsPerDataSourcePerTierForSegments} + */ + boolean matchLoadStatusCount(); + /** * {@link DruidCoordinatorRuntimeParams#getUsedSegments()} must not be called in Rule's code, because the used * segments are not specified for the {@link DruidCoordinatorRuntimeParams} passed into Rule's code. This is because From 9b6dc750ebd27525d833387968fa50130ed8f492 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 18 Jun 2020 23:06:17 -1000 Subject: [PATCH 2/4] address comments --- .../server/coordinator/DruidCoordinator.java | 41 ++++--------------- .../coordinator/SegmentReplicantLookup.java | 24 +++++++++-- .../server/coordinator/ServerHolder.java | 6 +++ .../rules/BroadcastDistributionRule.java | 29 ++++++++++++- .../server/coordinator/rules/DropRule.java | 2 +- .../server/coordinator/rules/LoadRule.java | 25 ++++++++++- .../druid/server/coordinator/rules/Rule.java | 28 ++++++++++--- 7 files changed, 111 insertions(+), 44 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 91df5de08612..bc8394c52ae9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -275,13 +275,6 @@ public Map> computeUnderReplicationCountsPerDataS ) { final Map> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); - final Set decommissioningServers = getDynamicConfigs().getDecommissioningNodes(); - final List broadcastTargetServers = serverInventoryView - .getInventory() - .stream() - .filter(druidServer -> druidServer.isSegmentBroadcastTarget() && !decommissioningServers.contains(druidServer.getHost())) - .map(DruidServer::toImmutableDruidServer) - .collect(Collectors.toList()); if (segmentReplicantLookup == null) { return underReplicationCountsPerDataSourcePerTier; @@ -293,36 +286,18 @@ public Map> computeUnderReplicationCountsPerDataS final List rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource()); for (final Rule rule : rules) { - if (!rule.matchLoadStatusCount() || !rule.appliesTo(segment, now)) { + if (!rule.appliesTo(segment, now)) { + // Rule did not match. Continue to the next Rule. continue; } - - if (rule instanceof LoadRule) { - ((LoadRule) rule) - .getTieredReplicants() - .forEach((final String tier, final Integer ruleReplicants) -> { - int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier); - Object2LongMap underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier - .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()); - ((Object2LongOpenHashMap) underReplicationPerDataSource) - .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); - }); - } else if (rule instanceof BroadcastDistributionRule) { - for (ImmutableDruidServer server : broadcastTargetServers) { - Object2LongMap underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier - .computeIfAbsent(server.getTier(), ignored -> new Object2LongOpenHashMap<>()); - if (server.getSegment(segment.getId()) == null) { - ((Object2LongOpenHashMap) underReplicationPerDataSource) - .addTo(segment.getDataSource(), 1); - } else { - // This make sure that every datasource has a entry even if the all segments are loaded - underReplicationPerDataSource.putIfAbsent(segment.getDataSource(), 0); - } - } - } else { - log.error("Rule class [%s] returns matchLoadStatusCount=true but did not implement compute logic", rule.getClass()); + if (!rule.canLoadSegments()) { + // Rule matched but rule does not and cannot load segments. + // Hence, there is no need to update underReplicationCountsPerDataSourcePerTier map + break; } + rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, segmentReplicantLookup, segment); + // Only the first matching rule applies. This is because the Coordinator cycle through all used segments // and match each segment with the first rule that applies. Each segment may only match a single rule. break; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java index cd04bfdcb8a5..a640cb9ae552 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java @@ -21,6 +21,8 @@ import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -62,19 +64,22 @@ public static SegmentReplicantLookup make(DruidCluster cluster) } } - return new SegmentReplicantLookup(segmentsInCluster, loadingSegments); + return new SegmentReplicantLookup(segmentsInCluster, loadingSegments, cluster); } private final Table segmentsInCluster; private final Table loadingSegments; + private final DruidCluster cluster; private SegmentReplicantLookup( Table segmentsInCluster, - Table loadingSegments + Table loadingSegments, + DruidCluster cluster ) { this.segmentsInCluster = segmentsInCluster; this.loadingSegments = loadingSegments; + this.cluster = cluster; } public Map getClusterTiers(SegmentId segmentId) @@ -93,7 +98,7 @@ int getLoadedReplicants(SegmentId segmentId) return retVal; } - int getLoadedReplicants(SegmentId segmentId, String tier) + public int getLoadedReplicants(SegmentId segmentId, String tier) { Integer retVal = segmentsInCluster.get(segmentId, tier); return (retVal == null) ? 0 : retVal; @@ -124,4 +129,17 @@ public int getTotalReplicants(SegmentId segmentId, String tier) { return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier); } + + public Object2LongMap getBroadcastUnderReplication(SegmentId segmentId) + { + Object2LongOpenHashMap perTier = new Object2LongOpenHashMap<>(); + for (ServerHolder holder : cluster.getAllServers()) { + if (holder.getServer().getType().isSegmentBroadcastTarget() && !holder.isServingSegment(segmentId)) { + perTier.addTo(holder.getServer().getTier(), 1L); + } else { + perTier.putIfAbsent(holder.getServer().getTier(), 0); + } + } + return perTier; + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index 26fa9a54c7ff..db2422637ae8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -22,6 +22,7 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import java.util.Objects; @@ -132,6 +133,11 @@ public int getNumberOfSegmentsInQueue() return peon.getNumberOfSegmentsInQueue(); } + public boolean isServingSegment(SegmentId segmentId) + { + return server.getSegment(segmentId) != null; + } + @Override public int compareTo(ServerHolder serverHolder) { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java index a88e49064df3..4bed76970969 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java @@ -19,15 +19,19 @@ package org.apache.druid.server.coordinator.rules; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CoordinatorStats; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.SegmentReplicantLookup; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.timeline.DataSegment; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -72,11 +76,34 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim } @Override - public boolean matchLoadStatusCount() + public boolean canLoadSegments() { return true; } + @Override + public void updateUnderReplicated( + Map> underReplicatedPerTier, + SegmentReplicantLookup segmentReplicantLookup, + DataSegment segment + ) + { + Object2LongMap underReplicatedBroadcastTiers = segmentReplicantLookup.getBroadcastUnderReplication(segment.getId()); + for (String tier : underReplicatedBroadcastTiers.keySet()) { + underReplicatedPerTier.compute(tier, (_tier, existing) -> { + Object2LongMap underReplicationPerDataSource = existing; + if (existing == null) { + underReplicationPerDataSource = new Object2LongOpenHashMap<>(); + } + underReplicationPerDataSource.compute( + segment.getDataSource(), + (datasource, count) -> count != null ? count + 1L : 0L + ); + return underReplicationPerDataSource; + }); + } + } + private CoordinatorStats assign( final Set serverHolders, final DataSegment segment diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java index f3c18d6fba75..7ffc7a211551 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/DropRule.java @@ -39,7 +39,7 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim } @Override - public boolean matchLoadStatusCount() + public boolean canLoadSegments() { return false; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java index a85182226d59..d0bf5b8ad56b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java @@ -21,6 +21,8 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import it.unimi.dsi.fastutil.objects.Object2LongMap; +import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -30,6 +32,7 @@ import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; import org.apache.druid.server.coordinator.ReplicationThrottler; +import org.apache.druid.server.coordinator.SegmentReplicantLookup; import org.apache.druid.server.coordinator.ServerHolder; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -91,11 +94,31 @@ public CoordinatorStats run( } @Override - public boolean matchLoadStatusCount() + public boolean canLoadSegments() { return true; } + @Override + public void updateUnderReplicated( + Map> underReplicatedPerTier, + SegmentReplicantLookup segmentReplicantLookup, + DataSegment segment + ) + { + getTieredReplicants().forEach((final String tier, final Integer ruleReplicants) -> { + int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier); + Object2LongMap underReplicationPerDataSource = underReplicatedPerTier.computeIfAbsent( + tier, + ignored -> new Object2LongOpenHashMap<>() + ); + ((Object2LongOpenHashMap) underReplicationPerDataSource).addTo( + segment.getDataSource(), + Math.max(ruleReplicants - currentReplicants, 0) + ); + }); + } + /** * @param stats {@link CoordinatorStats} to accumulate assignment statistics. */ diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java index 71754f8179c9..02c552f639cc 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java @@ -21,13 +21,18 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.druid.server.coordinator.CoordinatorStats; import org.apache.druid.server.coordinator.DruidCoordinator; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.SegmentReplicantLookup; import org.apache.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.Map; + /** */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @@ -52,12 +57,25 @@ public interface Rule boolean appliesTo(Interval interval, DateTime referenceTimestamp); /** - * Returns whether this Rules should be matched and considered in loadstatus API. - * In general, Rules that load segment onto any Druid node should return true. - * Any Rule that returns true for this method should add a compute logic (as if case condition) for the particular - * Rule class in {@link DruidCoordinator#computeUnderReplicationCountsPerDataSourcePerTierForSegments} + * Return true if this Rule can load segment onto one or more type of Druid node, otherwise return false. + * Any Rule that returns true for this method should implement logic for calculating segment under replicated + * in {@link Rule#updateUnderReplicated} + */ + boolean canLoadSegments(); + + /** + * This method should update the {@param underReplicatedPerTier} with the replication count of the + * {@param segment}. Rule that returns true for {@link Rule#canLoadSegments()} must override this method. + * Note that {@param underReplicatedPerTier} is a map of tier -> { dataSource -> underReplicationCount } */ - boolean matchLoadStatusCount(); + default void updateUnderReplicated( + Map> underReplicatedPerTier, + SegmentReplicantLookup segmentReplicantLookup, + DataSegment segment + ) + { + Preconditions.checkArgument(!canLoadSegments()); + } /** * {@link DruidCoordinatorRuntimeParams#getUsedSegments()} must not be called in Rule's code, because the used From 529022449da9762b4f47f0f541843b747bc6bb65 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 18 Jun 2020 23:07:07 -1000 Subject: [PATCH 3/4] address comments --- .../org/apache/druid/server/coordinator/DruidCoordinator.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index bc8394c52ae9..64168e11d76b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -29,7 +29,6 @@ import it.unimi.dsi.fastutil.objects.Object2IntMaps; import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import it.unimi.dsi.fastutil.objects.Object2LongMap; -import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.ZKPaths; import org.apache.druid.client.DataSourcesSnapshot; @@ -69,7 +68,6 @@ import org.apache.druid.server.coordinator.duty.MarkAsUnusedOvershadowedSegments; import org.apache.druid.server.coordinator.duty.RunRules; import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments; -import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule; import org.apache.druid.server.coordinator.rules.LoadRule; import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.initialization.ZkPathsConfig; From 0714ddbe3d593700416a8e29d39691c74b69e917 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 18 Jun 2020 23:52:08 -1000 Subject: [PATCH 4/4] add tests --- .../coordinator/SegmentReplicantLookup.java | 12 ++++++---- .../server/coordinator/ServerHolder.java | 2 +- .../rules/BroadcastDistributionRule.java | 6 +++-- .../server/coordinator/ServerHolderTest.java | 22 +++++++++++++++++-- 4 files changed, 33 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java index a640cb9ae552..b86ca0106d39 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java @@ -134,10 +134,14 @@ public Object2LongMap getBroadcastUnderReplication(SegmentId segmentId) { Object2LongOpenHashMap perTier = new Object2LongOpenHashMap<>(); for (ServerHolder holder : cluster.getAllServers()) { - if (holder.getServer().getType().isSegmentBroadcastTarget() && !holder.isServingSegment(segmentId)) { - perTier.addTo(holder.getServer().getTier(), 1L); - } else { - perTier.putIfAbsent(holder.getServer().getTier(), 0); + // Only record tier entry for server that is segment broadcast target + if (holder.getServer().getType().isSegmentBroadcastTarget()) { + // Every broadcast target server should be serving 1 replica of the segment + if (!holder.isServingSegment(segmentId)) { + perTier.addTo(holder.getServer().getTier(), 1L); + } else { + perTier.putIfAbsent(holder.getServer().getTier(), 0); + } } } return perTier; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java index db2422637ae8..43fdaaef1d1d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java @@ -115,7 +115,7 @@ public long getAvailableSize() public boolean isServingSegment(DataSegment segment) { - return server.getSegment(segment.getId()) != null; + return isServingSegment(segment.getId()); } public boolean isLoadingSegment(DataSegment segment) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java index 4bed76970969..0b8c37b79318 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRule.java @@ -89,7 +89,9 @@ public void updateUnderReplicated( ) { Object2LongMap underReplicatedBroadcastTiers = segmentReplicantLookup.getBroadcastUnderReplication(segment.getId()); - for (String tier : underReplicatedBroadcastTiers.keySet()) { + for (final Object2LongMap.Entry entry : underReplicatedBroadcastTiers.object2LongEntrySet()) { + final String tier = entry.getKey(); + final long underReplicatedCount = entry.getLongValue(); underReplicatedPerTier.compute(tier, (_tier, existing) -> { Object2LongMap underReplicationPerDataSource = existing; if (existing == null) { @@ -97,7 +99,7 @@ public void updateUnderReplicated( } underReplicationPerDataSource.compute( segment.getDataSource(), - (datasource, count) -> count != null ? count + 1L : 0L + (_datasource, count) -> count != null ? count + underReplicatedCount : underReplicatedCount ); return underReplicationPerDataSource; }); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java index fcebbdee8209..cb1ee0425b61 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java @@ -39,7 +39,7 @@ public class ServerHolderTest { private static final List SEGMENTS = ImmutableList.of( new DataSegment( - "test", + "src1", Intervals.of("2015-04-12/2015-04-13"), "1", ImmutableMap.of("containerName", "container1", "blobPath", "blobPath1"), @@ -50,7 +50,7 @@ public class ServerHolderTest 1 ), new DataSegment( - "test", + "src2", Intervals.of("2015-04-12/2015-04-13"), "1", ImmutableMap.of("containerName", "container2", "blobPath", "blobPath2"), @@ -177,4 +177,22 @@ public void testEquals() Assert.assertNotEquals(h1, h4); Assert.assertNotEquals(h1, h5); } + + @Test + public void testIsServingSegment() + { + final ServerHolder h1 = new ServerHolder( + new ImmutableDruidServer( + new DruidServerMetadata("name1", "host1", null, 100L, ServerType.HISTORICAL, "tier1", 0), + 0L, + ImmutableMap.of("src1", DATA_SOURCES.get("src1")), + 1 + ), + new LoadQueuePeonTester() + ); + Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0))); + Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1))); + Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0).getId())); + Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1).getId())); + } }