Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.DataSourcesSnapshot;
Expand Down Expand Up @@ -69,7 +68,6 @@
import org.apache.druid.server.coordinator.duty.MarkAsUnusedOvershadowedSegments;
import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.initialization.ZkPathsConfig;
Expand Down Expand Up @@ -275,13 +273,6 @@ public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataS
)
{
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
final Set<String> decommissioningServers = getDynamicConfigs().getDecommissioningNodes();
final List<ImmutableDruidServer> broadcastTargetServers = serverInventoryView
.getInventory()
.stream()
.filter(druidServer -> druidServer.isSegmentBroadcastTarget() && !decommissioningServers.contains(druidServer.getHost()))
.map(DruidServer::toImmutableDruidServer)
.collect(Collectors.toList());

if (segmentReplicantLookup == null) {
return underReplicationCountsPerDataSourcePerTier;
Expand All @@ -294,36 +285,19 @@ public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataS

for (final Rule rule : rules) {
if (!rule.appliesTo(segment, now)) {
// Rule did not match. Continue to the next Rule.
continue;
}

if (rule instanceof LoadRule) {
((LoadRule) rule)
.getTieredReplicants()
.forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
});
if (!rule.canLoadSegments()) {
// Rule matched but rule does not and cannot load segments.
// Hence, there is no need to update underReplicationCountsPerDataSourcePerTier map
break;
}

if (rule instanceof BroadcastDistributionRule) {
for (ImmutableDruidServer server : broadcastTargetServers) {
Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
.computeIfAbsent(server.getTier(), ignored -> new Object2LongOpenHashMap<>());
if (server.getSegment(segment.getId()) == null) {
((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
.addTo(segment.getDataSource(), 1);
} else {
// This make sure that every datasource has a entry even if the all segments are loaded
underReplicationPerDataSource.putIfAbsent(segment.getDataSource(), 0);
}
}
}
rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, segmentReplicantLookup, segment);

// only the first matching rule applies
// Only the first matching rule applies. This is because the Coordinator cycle through all used segments
// and match each segment with the first rule that applies. Each segment may only match a single rule.
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -62,19 +64,22 @@ public static SegmentReplicantLookup make(DruidCluster cluster)
}
}

return new SegmentReplicantLookup(segmentsInCluster, loadingSegments);
return new SegmentReplicantLookup(segmentsInCluster, loadingSegments, cluster);
}

private final Table<SegmentId, String, Integer> segmentsInCluster;
private final Table<SegmentId, String, Integer> loadingSegments;
private final DruidCluster cluster;

private SegmentReplicantLookup(
Table<SegmentId, String, Integer> segmentsInCluster,
Table<SegmentId, String, Integer> loadingSegments
Table<SegmentId, String, Integer> loadingSegments,
DruidCluster cluster
)
{
this.segmentsInCluster = segmentsInCluster;
this.loadingSegments = loadingSegments;
this.cluster = cluster;
}

public Map<String, Integer> getClusterTiers(SegmentId segmentId)
Expand All @@ -93,7 +98,7 @@ int getLoadedReplicants(SegmentId segmentId)
return retVal;
}

int getLoadedReplicants(SegmentId segmentId, String tier)
public int getLoadedReplicants(SegmentId segmentId, String tier)
{
Integer retVal = segmentsInCluster.get(segmentId, tier);
return (retVal == null) ? 0 : retVal;
Expand Down Expand Up @@ -124,4 +129,21 @@ public int getTotalReplicants(SegmentId segmentId, String tier)
{
return getLoadedReplicants(segmentId, tier) + getLoadingReplicants(segmentId, tier);
}

public Object2LongMap<String> getBroadcastUnderReplication(SegmentId segmentId)
{
Object2LongOpenHashMap<String> perTier = new Object2LongOpenHashMap<>();
for (ServerHolder holder : cluster.getAllServers()) {
// Only record tier entry for server that is segment broadcast target
if (holder.getServer().getType().isSegmentBroadcastTarget()) {
// Every broadcast target server should be serving 1 replica of the segment
if (!holder.isServingSegment(segmentId)) {
perTier.addTo(holder.getServer().getTier(), 1L);
} else {
perTier.putIfAbsent(holder.getServer().getTier(), 0);
}
}
}
return perTier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;

import java.util.Objects;

Expand Down Expand Up @@ -114,7 +115,7 @@ public long getAvailableSize()

public boolean isServingSegment(DataSegment segment)
{
return server.getSegment(segment.getId()) != null;
return isServingSegment(segment.getId());
}

public boolean isLoadingSegment(DataSegment segment)
Expand All @@ -132,6 +133,11 @@ public int getNumberOfSegmentsInQueue()
return peon.getNumberOfSegmentsInQueue();
}

public boolean isServingSegment(SegmentId segmentId)
{
return server.getSegment(segmentId) != null;
}

@Override
public int compareTo(ServerHolder serverHolder)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

package org.apache.druid.server.coordinator.rules;

import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -71,6 +75,37 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim
.accumulate(drop(dropServerHolders, segment));
}

@Override
public boolean canLoadSegments()
{
return true;
}

@Override
public void updateUnderReplicated(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DataSegment segment
)
{
Object2LongMap<String> underReplicatedBroadcastTiers = segmentReplicantLookup.getBroadcastUnderReplication(segment.getId());
for (final Object2LongMap.Entry<String> entry : underReplicatedBroadcastTiers.object2LongEntrySet()) {
final String tier = entry.getKey();
final long underReplicatedCount = entry.getLongValue();
underReplicatedPerTier.compute(tier, (_tier, existing) -> {
Object2LongMap<String> underReplicationPerDataSource = existing;
if (existing == null) {
underReplicationPerDataSource = new Object2LongOpenHashMap<>();
}
underReplicationPerDataSource.compute(
segment.getDataSource(),
(_datasource, count) -> count != null ? count + underReplicatedCount : underReplicatedCount
);
return underReplicationPerDataSource;
});
}
}

private CoordinatorStats assign(
final Set<ServerHolder> serverHolders,
final DataSegment segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,10 @@ public CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntim
stats.addToGlobalStat("deletedCount", 1);
return stats;
}

@Override
public boolean canLoadSegments()
{
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
Expand All @@ -30,6 +32,7 @@
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ReplicationThrottler;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -90,6 +93,32 @@ public CoordinatorStats run(
}
}

@Override
public boolean canLoadSegments()
{
return true;
}

@Override
public void updateUnderReplicated(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DataSegment segment
)
{
getTieredReplicants().forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
Object2LongMap<String> underReplicationPerDataSource = underReplicatedPerTier.computeIfAbsent(
tier,
ignored -> new Object2LongOpenHashMap<>()
);
((Object2LongOpenHashMap<String>) underReplicationPerDataSource).addTo(
segment.getDataSource(),
Math.max(ruleReplicants - currentReplicants, 0)
);
});
}

/**
* @param stats {@link CoordinatorStats} to accumulate assignment statistics.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.SegmentReplicantLookup;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.util.Map;

/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
Expand All @@ -51,6 +56,27 @@ public interface Rule

boolean appliesTo(Interval interval, DateTime referenceTimestamp);

/**
* Return true if this Rule can load segment onto one or more type of Druid node, otherwise return false.
* Any Rule that returns true for this method should implement logic for calculating segment under replicated
* in {@link Rule#updateUnderReplicated}
*/
boolean canLoadSegments();

/**
* This method should update the {@param underReplicatedPerTier} with the replication count of the
* {@param segment}. Rule that returns true for {@link Rule#canLoadSegments()} must override this method.
* Note that {@param underReplicatedPerTier} is a map of tier -> { dataSource -> underReplicationCount }
*/
default void updateUnderReplicated(
Map<String, Object2LongMap<String>> underReplicatedPerTier,
SegmentReplicantLookup segmentReplicantLookup,
DataSegment segment
)
{
Preconditions.checkArgument(!canLoadSegments());
}

/**
* {@link DruidCoordinatorRuntimeParams#getUsedSegments()} must not be called in Rule's code, because the used
* segments are not specified for the {@link DruidCoordinatorRuntimeParams} passed into Rule's code. This is because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class ServerHolderTest
{
private static final List<DataSegment> SEGMENTS = ImmutableList.of(
new DataSegment(
"test",
"src1",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", "container1", "blobPath", "blobPath1"),
Expand All @@ -50,7 +50,7 @@ public class ServerHolderTest
1
),
new DataSegment(
"test",
"src2",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", "container2", "blobPath", "blobPath2"),
Expand Down Expand Up @@ -177,4 +177,22 @@ public void testEquals()
Assert.assertNotEquals(h1, h4);
Assert.assertNotEquals(h1, h5);
}

@Test
public void testIsServingSegment()
{
final ServerHolder h1 = new ServerHolder(
new ImmutableDruidServer(
new DruidServerMetadata("name1", "host1", null, 100L, ServerType.HISTORICAL, "tier1", 0),
0L,
ImmutableMap.of("src1", DATA_SOURCES.get("src1")),
1
),
new LoadQueuePeonTester()
);
Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0)));
Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0).getId()));
Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1).getId()));
}
}