Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

package io.druid.client.selector;

import io.druid.java.util.common.ISE;
import com.google.common.collect.Iterables;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;

import java.util.Map;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;

/**
*/
Expand All @@ -39,23 +40,27 @@ public AbstractTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrateg

@Override
public QueryableDruidServer pick(
TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment
)
{
final Map.Entry<Integer, Set<QueryableDruidServer>> priorityServers = prioritizedServers.pollFirstEntry();

if (priorityServers == null) {
return null;
}
return Iterables.getOnlyElement(pick(prioritizedServers, segment, 1), null);
}

final Set<QueryableDruidServer> servers = priorityServers.getValue();
switch (servers.size()) {
case 0:
throw new ISE("[%s] Something hella weird going on here. We should not be here", segment.getIdentifier());
case 1:
return priorityServers.getValue().iterator().next();
default:
return serverSelectorStrategy.pick(servers, segment);
@Override
public List<QueryableDruidServer> pick(
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
int numServersToPick
)
{
List<QueryableDruidServer> result = new ArrayList<>(numServersToPick);
for (Set<QueryableDruidServer> priorityServers : prioritizedServers.values()) {
result.addAll(serverSelectorStrategy.pick(priorityServers, segment, numServersToPick - result.size()));
if (result.size() == numServersToPick) {
break;
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

package io.druid.client.selector;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import io.druid.timeline.DataSegment;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;

public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
Expand All @@ -42,4 +45,15 @@ public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment
{
return Collections.min(servers, comparator);
}

@Override
public List<QueryableDruidServer> pick(
Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick
)
{
if (servers.size() <= numServersToPick) {
return ImmutableList.copyOf(servers);
}
return Ordering.from(comparator).leastOf(servers, numServersToPick);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,32 @@

package io.druid.client.selector;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import io.druid.timeline.DataSegment;

import java.util.Random;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

public class RandomServerSelectorStrategy implements ServerSelectorStrategy
{
private static final Random random = new Random();

@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)
{
return Iterators.get(servers.iterator(), random.nextInt(servers.size()));
return Iterators.get(servers.iterator(), ThreadLocalRandom.current().nextInt(servers.size()));
}

@Override
public List<QueryableDruidServer> pick(Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick)
{
if (servers.size() <= numServersToPick) {
return ImmutableList.copyOf(servers);
}
List<QueryableDruidServer> list = Lists.newArrayList(servers);
Collections.shuffle(list, ThreadLocalRandom.current());
return ImmutableList.copyOf(list.subList(0, numServersToPick));
}
}
79 changes: 33 additions & 46 deletions server/src/main/java/io/druid/client/selector/ServerSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,23 @@

package io.druid.client.selector;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.emitter.EmittingLogger;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
*/
public class ServerSelector implements DiscoverySelector<QueryableDruidServer>
{

private static final EmittingLogger log = new EmittingLogger(ServerSelector.class);

private final Set<QueryableDruidServer> servers = Sets.newHashSet();
private final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> servers;

private final TierSelectorStrategy strategy;

Expand All @@ -50,8 +46,9 @@ public ServerSelector(
TierSelectorStrategy strategy
)
{
this.segment = new AtomicReference<DataSegment>(segment);
this.segment = new AtomicReference<>(segment);
this.strategy = strategy;
this.servers = new Int2ObjectRBTreeMap<>(strategy.getComparator());
}

public DataSegment getSegment()
Expand All @@ -65,14 +62,25 @@ public void addServerAndUpdateSegment(
{
synchronized (this) {
this.segment.set(segment);
servers.add(server);
int priority = server.getServer().getPriority();
Set<QueryableDruidServer> priorityServers = servers.computeIfAbsent(priority, p -> new HashSet<>());
priorityServers.add(server);
}
}

public boolean removeServer(QueryableDruidServer server)
{
synchronized (this) {
return servers.remove(server);
int priority = server.getServer().getPriority();
Set<QueryableDruidServer> priorityServers = servers.get(priority);
if (priorityServers == null) {
return false;
}
boolean result = priorityServers.remove(server);
if (priorityServers.isEmpty()) {
servers.remove(priority);
}
return result;
}
}

Expand All @@ -84,49 +92,28 @@ public boolean isEmpty()
}

public List<DruidServerMetadata> getCandidates(final int numCandidates) {
List<DruidServerMetadata> result = Lists.newArrayList();
synchronized (this) {
final DataSegment target = segment.get();
for (Map.Entry<Integer, Set<QueryableDruidServer>> entry : toPrioritizedServers().entrySet()) {
Set<QueryableDruidServer> servers = entry.getValue();
TreeMap<Integer, Set<QueryableDruidServer>> tieredMap = Maps.newTreeMap();
while (!servers.isEmpty()) {
tieredMap.put(entry.getKey(), servers); // strategy.pick() removes entry
QueryableDruidServer server = strategy.pick(tieredMap, target);
if (server == null) {
// regard this as any server in tieredMap is not appropriate
break;
}
result.add(server.getServer().getMetadata());
if (numCandidates > 0 && result.size() >= numCandidates) {
return result;
}
servers.remove(server);
}
if (numCandidates > 0) {
return strategy.pick(servers, segment.get(), numCandidates)
.stream()
.map(server -> server.getServer().getMetadata())
.collect(Collectors.toList());
} else {
// return all servers as candidates
return servers.values()
.stream()
.flatMap(Collection::stream)
.map(server -> server.getServer().getMetadata())
.collect(Collectors.toList());
}
}
return result;
}

@Override
public QueryableDruidServer pick()
{
synchronized (this) {
return strategy.pick(toPrioritizedServers(), segment.get());
}
}

private TreeMap<Integer, Set<QueryableDruidServer>> toPrioritizedServers()
{
final TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers = new TreeMap<>(strategy.getComparator());
for (QueryableDruidServer server : servers) {
Set<QueryableDruidServer> theServers = prioritizedServers.get(server.getServer().getPriority());
if (theServers == null) {
theServers = Sets.newHashSet();
prioritizedServers.put(server.getServer().getPriority(), theServers);
}
theServers.add(server);
return strategy.pick(servers, segment.get());
}
return prioritizedServers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.timeline.DataSegment;

import java.util.List;
import java.util.Set;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RandomServerSelectorStrategy.class)
Expand All @@ -32,5 +33,7 @@
})
public interface ServerSelectorStrategy
{
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment);
QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment);

List<QueryableDruidServer> pick(Set<QueryableDruidServer> servers, DataSegment segment, int numServersToPick);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;

import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;

/**
*/
Expand All @@ -37,7 +38,13 @@
})
public interface TierSelectorStrategy
{
public Comparator<Integer> getComparator();
Comparator<Integer> getComparator();

public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);
QueryableDruidServer pick(Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers, DataSegment segment);

List<QueryableDruidServer> pick(
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
int numServersToPick
);
}
Loading