Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1789,8 +1789,10 @@ These Broker configurations can be defined in the `broker/runtime.properties` fi
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.broker.balancer.type`|`random`, `connectionCount`|Determines how the broker balances connections to Historical processes. `random` choose randomly, `connectionCount` picks the process with the fewest number of active connections to|`random`|
|`druid.broker.select.tier`|`highestPriority`, `lowestPriority`, `custom`|If segments are cross-replicated across tiers in a cluster, you can tell the broker to prefer to select segments in a tier with a certain priority.|`highestPriority`|
|`druid.broker.select.tier`|`highestPriority`, `lowestPriority`, `custom`, `preferred`|If segments are cross-replicated across tiers in a cluster, you can tell the broker to prefer to select segments in a tier with a certain priority.|`highestPriority`|
|`druid.broker.select.tier.custom.priorities`|An array of integer priorities, such as `[-1, 0, 1, 2]`|Select servers in tiers with a custom priority list.|The config only has effect if `druid.broker.select.tier` is set to `custom`. If `druid.broker.select.tier` is set to `custom` but this config is not specified, the effect is the same as `druid.broker.select.tier` set to `highestPriority`. Any of the integers in this config can be ignored if there's no corresponding tiers with such priorities. Tiers with priorities explicitly specified in this config always have higher priority than those not and those not specified fall back to use `highestPriority` strategy among themselves.|
|`druid.broker.select.tier.preferred.tier`| The preferred tier name. E.g., `_default_tier` | A non-empty value that specifies the preferred tier in which historical servers will be picked up for queries. If there are not enough historical servers from the preferred tier, servers from other tiers (if there are any) will be selected. This config only has effect if `druid.broker.select.tier` is set to `preferred` | null |
|`druid.broker.select.tier.preferred.priority`| `highest`, `lowest` | If there are multiple candidates in a preferred tier, specifies the priority to pick up candidates. By default, the higher priority a historical, the higher chances it will be picked up. This config only has effect if `druid.broker.select.tier` is set to `preferred`| `highest` |

##### Query prioritization and laning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
*/
public abstract class AbstractTierSelectorStrategy implements TierSelectorStrategy
{
private final ServerSelectorStrategy serverSelectorStrategy;
protected final ServerSelectorStrategy serverSelectorStrategy;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be restored to private now that PreferredTierSelectorStrategy is delegating to highest or lowest priority strategy instead of calling server selector direct

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted. thanks for pointing out


public AbstractTierSelectorStrategy(ServerSelectorStrategy serverSelectorStrategy)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.client.selector;


import com.fasterxml.jackson.annotation.JacksonInject;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.apache.druid.client.QueryableDruidServer;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query;
import org.apache.druid.timeline.DataSegment;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class PreferredTierSelectorStrategy extends AbstractTierSelectorStrategy
{
private static final Logger log = new Logger(PreferredTierSelectorStrategy.class);

private final String preferredTier;
private final TierSelectorStrategy priorityStrategy;

public PreferredTierSelectorStrategy(
@JacksonInject ServerSelectorStrategy serverSelectorStrategy,
@JacksonInject PreferredTierSelectorStrategyConfig config
)
{
super(serverSelectorStrategy);
this.preferredTier = config.getTier();

if (config.getPriority() == null) {
this.priorityStrategy = new HighestPriorityTierSelectorStrategy(serverSelectorStrategy);
} else {
if ("highest".equalsIgnoreCase(config.getPriority())) {
this.priorityStrategy = new HighestPriorityTierSelectorStrategy(serverSelectorStrategy);
} else if ("lowest".equalsIgnoreCase(config.getPriority())) {
this.priorityStrategy = new LowestPriorityTierSelectorStrategy(serverSelectorStrategy);
} else {
throw new IAE("druid.broker.select.tier.preferred.priority must be either 'highest' or 'lowest'");
}
}
}

@Override
public Comparator<Integer> getComparator()
{
return priorityStrategy.getComparator();
}

@Override
public <T> List<QueryableDruidServer> pick(
Query<T> query,
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> prioritizedServers,
DataSegment segment,
int numServersToPick
)
{
if (log.isDebugEnabled()) {
log.debug(
"Picking [%d] servers from preferred tier [%s] for segment [%s] with priority [%s]",
numServersToPick, preferredTier, segment.getId(), this.priorityStrategy.getClass().getSimpleName()
);
}

Int2ObjectRBTreeMap<Set<QueryableDruidServer>> preferred = new Int2ObjectRBTreeMap<>(priorityStrategy.getComparator());
Int2ObjectRBTreeMap<Set<QueryableDruidServer>> nonPreferred = new Int2ObjectRBTreeMap<>(priorityStrategy.getComparator());
for (Set<QueryableDruidServer> priorityServers : prioritizedServers.values()) {
for (QueryableDruidServer server : priorityServers) {
if (preferredTier.equals(server.getServer().getMetadata().getTier())) {
preferred.computeIfAbsent(server.getServer().getPriority(), k -> new HashSet<>())
.add(server);
} else {
nonPreferred.computeIfAbsent(server.getServer().getPriority(), k -> new HashSet<>())
.add(server);
}
}
}

List<QueryableDruidServer> picks = new ArrayList<>(numServersToPick);
if (!preferred.isEmpty()) {
// If we have preferred servers, pick them first
picks.addAll(priorityStrategy.pick(query, preferred, segment, numServersToPick));
}

if (picks.size() < numServersToPick && !nonPreferred.isEmpty()) {
// If we don't have enough preferred servers, pick from the non-preferred ones
int remaining = numServersToPick - picks.size();
picks.addAll(priorityStrategy.pick(query, nonPreferred, segment, remaining));
}

return picks;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.druid.client.selector;


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

public class PreferredTierSelectorStrategyConfig
{
@JsonProperty
private String tier;

/**
* Only two options: high or low
*/
@JsonProperty
private String priority;

@JsonCreator
public PreferredTierSelectorStrategyConfig(
@JsonProperty("tier") String tier,
@JsonProperty("priority") String priority
)
{
Preconditions.checkState(tier != null && !tier.isEmpty(),
"druid.broker.select.tier.preferred.tier can't be empty");
this.tier = tier.trim();
this.priority = priority;
}

public String getTier()
{
return tier;
}

public void setTier(String tier)
{
this.tier = tier;
}

public String getPriority()
{
return priority;
}

public void setPriority(String priority)
{
this.priority = priority;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "highestPriority", value = HighestPriorityTierSelectorStrategy.class),
@JsonSubTypes.Type(name = "lowestPriority", value = LowestPriorityTierSelectorStrategy.class),
@JsonSubTypes.Type(name = "custom", value = CustomTierSelectorStrategy.class)
@JsonSubTypes.Type(name = "custom", value = CustomTierSelectorStrategy.class),
@JsonSubTypes.Type(name = "preferred", value = PreferredTierSelectorStrategy.class),
})
public interface TierSelectorStrategy
{
Expand Down
Loading
Loading