Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b38d061
Priority on loading for primary replica
xanec Jul 25, 2017
aaae2c3
Merge remote-tracking branch 'upstream/master' into prioritized-segme…
leventov Sep 6, 2017
e273af1
Simplicity fixes
leventov Sep 6, 2017
1a0e6a7
Fix on skipping drop for quick return.
xanec Sep 7, 2017
1fe5115
change to debug logging for no replicants.
xanec Sep 7, 2017
a98c0aa
Fix on filter logic
xanec Sep 7, 2017
b98fad7
swapping if-else
xanec Sep 7, 2017
97cc230
Fix on wrong "hasTier" logic
xanec Sep 8, 2017
18802ab
Refactoring of LoadRule
xanec Sep 8, 2017
4ee9f58
Rename createPredicate to createLoadQueueSizeLimitingPredicate
xanec Sep 11, 2017
d1feeb4
Rename getHolderList to getFilteredHolders
xanec Sep 11, 2017
1f5c047
remove varargs
xanec Sep 11, 2017
441203e
extract out currentReplicantsInTier
xanec Sep 11, 2017
0aedc65
rename holders to holdersInTier
xanec Sep 11, 2017
4b84190
don't do temporary removal of tier.
xanec Sep 11, 2017
15b6011
rename primaryTier to tierToSkip
xanec Sep 11, 2017
61ebb16
change LinkedList to ArrayList
xanec Sep 11, 2017
8b1e7a9
Merge remote-tracking branch 'upstream/master' into prioritized-segme…
leventov Sep 12, 2017
c4a4276
Merge remote-tracking branch 'upstream/master' into prioritized-segme…
leventov Sep 13, 2017
849022e
Change MinMaxPriorityQueue in DruidCluster to TreeSet.
xanec Sep 20, 2017
265a0d6
Merge branch 'prioritized-segment-loading' of github.com:metamx/druid…
xanec Sep 20, 2017
4e46501
Adding some comments.
xanec Sep 20, 2017
aef9fc7
Modify log messages in light of predicates.
xanec Sep 20, 2017
9a4766d
Add in-method comments
xanec Sep 20, 2017
40cd580
Don't create new Object2IntOpenHashMap for each run() call.
xanec Sep 21, 2017
748112c
Cache result from strategy call in the primary assignment to be reuse…
xanec Sep 21, 2017
0dde8f5
Spelling mistake
xanec Sep 21, 2017
49c5cd7
Merge remote-tracking branch 'upstream/master' into prioritized-segme…
leventov Sep 21, 2017
558da4a
Cleaning up javadoc.
xanec Sep 21, 2017
8784e69
refactor out loading in progress check.
xanec Sep 21, 2017
bf3d79f
Removed redundant comment.
xanec Sep 26, 2017
33c6519
Removed forbidden API
xanec Sep 26, 2017
d7b7c55
Correct non-forbidden API.
xanec Sep 26, 2017
6630ae3
Precision in variable type for NavigableSet.
xanec Sep 28, 2017
174d73d
Obsolete comment.
xanec Sep 28, 2017
aae2df9
Clarity in method call and moving retrieval of ServerHolder into meth…
xanec Sep 28, 2017
cb69cae
Comment on mutability of CoordinatoorStats.
xanec Sep 28, 2017
428a820
Added auxiliary fixture for dropping.
xanec Sep 28, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions server/src/main/java/io/druid/server/coordinator/DruidCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@
package io.druid.server.coordinator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Ordering;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.IAE;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;

/**
* Contains a representation of the current state of the cluster by tier.
Expand All @@ -41,7 +42,7 @@
public class DruidCluster
{
private final Set<ServerHolder> realtimes;
private final Map<String, MinMaxPriorityQueue<ServerHolder>> historicals;
private final Map<String, NavigableSet<ServerHolder>> historicals;

public DruidCluster()
{
Expand All @@ -52,7 +53,7 @@ public DruidCluster()
@VisibleForTesting
public DruidCluster(
@Nullable Set<ServerHolder> realtimes,
Map<String, MinMaxPriorityQueue<ServerHolder>> historicals
Map<String, NavigableSet<ServerHolder>> historicals
)
{
this.realtimes = realtimes == null ? new HashSet<>() : new HashSet<>(realtimes);
Expand Down Expand Up @@ -86,9 +87,9 @@ private void addRealtime(ServerHolder serverHolder)
private void addHistorical(ServerHolder serverHolder)
{
final ImmutableDruidServer server = serverHolder.getServer();
final MinMaxPriorityQueue<ServerHolder> tierServers = historicals.computeIfAbsent(
final NavigableSet<ServerHolder> tierServers = historicals.computeIfAbsent(
server.getTier(),
k -> MinMaxPriorityQueue.orderedBy(Ordering.natural().reverse()).create()
k -> new TreeSet<>(Collections.reverseOrder())
);
tierServers.add(serverHolder);
}
Expand All @@ -98,7 +99,7 @@ public Set<ServerHolder> getRealtimes()
return realtimes;
}

public Map<String, MinMaxPriorityQueue<ServerHolder>> getHistoricals()
public Map<String, NavigableSet<ServerHolder>> getHistoricals()
{
return historicals;
}
Expand All @@ -108,7 +109,7 @@ public Iterable<String> getTierNames()
return historicals.keySet();
}

public MinMaxPriorityQueue<ServerHolder> getHistoricalsByTier(String tier)
public NavigableSet<ServerHolder> getHistoricalsByTier(String tier)
{
return historicals.get(tier);
}
Expand All @@ -124,7 +125,7 @@ public Collection<ServerHolder> getAllServers()
return allServers;
}

public Iterable<MinMaxPriorityQueue<ServerHolder>> getSortedHistoricalsByTier()
public Iterable<NavigableSet<ServerHolder>> getSortedHistoricalsByTier()
{
return historicals.values();
}
Expand All @@ -146,7 +147,7 @@ public boolean hasRealtimes()

public boolean hasTier(String tier)
{
MinMaxPriorityQueue<ServerHolder> servers = historicals.get(tier);
return (servers == null) || servers.isEmpty();
NavigableSet<ServerHolder> servers = historicals.get(tier);
return (servers != null) && !servers.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Table;
import io.druid.client.ImmutableDruidServer;
import io.druid.timeline.DataSegment;

import java.util.Map;
import java.util.SortedSet;

/**
* A lookup for the number of replicants of a given segment for a certain tier.
Expand All @@ -38,7 +38,7 @@ public static SegmentReplicantLookup make(DruidCluster cluster)
final Table<String, String, Integer> segmentsInCluster = HashBasedTable.create();
final Table<String, String, Integer> loadingSegments = HashBasedTable.create();

for (MinMaxPriorityQueue<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
for (SortedSet<ServerHolder> serversByType : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serversByType) {
ImmutableDruidServer server = serverHolder.getServer();

Expand Down
32 changes: 24 additions & 8 deletions server/src/main/java/io/druid/server/coordinator/ServerHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.server.coordinator;

import com.google.common.primitives.Longs;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;
Expand Down Expand Up @@ -52,32 +53,32 @@ public LoadQueuePeon getPeon()
return peon;
}

public Long getMaxSize()
public long getMaxSize()
{
return server.getMaxSize();
}

public Long getCurrServerSize()
public long getCurrServerSize()
{
return server.getCurrSize();
}

public Long getLoadQueueSize()
public long getLoadQueueSize()
{
return peon.getLoadQueueSize();
}

public Long getSizeUsed()
public long getSizeUsed()
{
return getCurrServerSize() + getLoadQueueSize();
}

public Double getPercentUsed()
public double getPercentUsed()
{
return (100 * getSizeUsed().doubleValue()) / getMaxSize();
return (100.0 * getSizeUsed()) / getMaxSize();
}

public Long getAvailableSize()
public long getAvailableSize()
{
long maxSize = getMaxSize();
long sizeUsed = getSizeUsed();
Expand Down Expand Up @@ -114,7 +115,22 @@ public int getNumberOfSegmentsInQueue()
@Override
public int compareTo(ServerHolder serverHolder)
{
return getAvailableSize().compareTo(serverHolder.getAvailableSize());
int result = Longs.compare(getAvailableSize(), serverHolder.getAvailableSize());
if (result != 0) {
return result;
}

result = server.getHost().compareTo(serverHolder.server.getHost());
if (result != 0) {
return result;
}

result = server.getTier().compareTo(serverHolder.server.getTier());
if (result != 0) {
return result;
}

return server.getType().compareTo(serverHolder.server.getType());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.druid.server.coordinator.helper;

import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.EmittingLogger;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.StringUtils;
Expand All @@ -38,6 +37,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand Down Expand Up @@ -78,7 +79,7 @@ protected void reduceLifetimes(String tier)
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
final CoordinatorStats stats = new CoordinatorStats();
params.getDruidCluster().getHistoricals().forEach((String tier, MinMaxPriorityQueue<ServerHolder> servers) -> {
params.getDruidCluster().getHistoricals().forEach((String tier, NavigableSet<ServerHolder> servers) -> {
balanceTier(params, tier, servers, stats);
});
return params.buildFromExisting().withCoordinatorStats(stats).build();
Expand All @@ -87,7 +88,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
private void balanceTier(
DruidCoordinatorRuntimeParams params,
String tier,
MinMaxPriorityQueue<ServerHolder> servers,
SortedSet<ServerHolder> servers,
CoordinatorStats stats
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package io.druid.server.coordinator.helper;

import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;

import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.guava.Comparators;
Expand All @@ -34,6 +32,7 @@
import io.druid.timeline.VersionedIntervalTimeline;

import java.util.Map;
import java.util.SortedSet;

public class DruidCoordinatorCleanupOvershadowed implements DruidCoordinatorHelper
{
Expand All @@ -55,7 +54,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
DruidCluster cluster = params.getDruidCluster();
Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = Maps.newHashMap();

for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package io.druid.server.coordinator.helper;

import com.google.common.collect.MinMaxPriorityQueue;

import io.druid.client.ImmutableDruidDataSource;
import io.druid.client.ImmutableDruidServer;
import io.druid.java.util.common.logger.Logger;
Expand All @@ -34,6 +32,7 @@
import io.druid.timeline.DataSegment;

import java.util.Set;
import java.util.SortedSet;

/**
*/
Expand Down Expand Up @@ -64,7 +63,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
// This is done to prevent a race condition in which the coordinator would drop all segments if it started running
// cleanup before it finished polling the metadata storage for available segments for the first time.
if (!availableSegments.isEmpty()) {
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package io.druid.server.coordinator.helper;

import com.google.common.collect.MinMaxPriorityQueue;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.DruidDataSource;
Expand Down Expand Up @@ -171,7 +170,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
);

log.info("Load Queues:");
for (MinMaxPriorityQueue<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (Iterable<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();
LoadQueuePeon queuePeon = serverHolder.getPeon();
Expand Down
Loading