Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions docs/content/operations/rule-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ layout: doc_page
---
# Retaining or Automatically Dropping Data

Coordinator nodes use rules to determine what data should be loaded or dropped from the cluster. Rules are used for data retention and are set on the coordinator console (http://coordinator_ip:port).
Coordinator nodes use rules to determine what data should be loaded to or dropped from the cluster. Rules are used for data retention and query execution, and are set on the coordinator console (http://coordinator_ip:port).

Rules indicate how segments should be assigned to different historical node tiers and how many replicas of a segment should exist in each tier. Rules may also indicate when segments should be dropped entirely from the cluster. The coordinator loads a set of rules from the metadata storage. Rules may be specific to a certain datasource and/or a default set of rules can be configured. Rules are read in order and hence the ordering of rules is important. The coordinator will cycle through all available segments and match each segment with the first rule that applies. Each segment may only match a single rule.
There are three types of rules, i.e., load rules, drop rules, and broadcast rules. Load rules indicate how segments should be assigned to different historical node tiers and how many replicas of a segment should exist in each tier.
Drop rules indicate when segments should be dropped entirely from the cluster. Finally, broadcast rules indicate how segments of different data sources should be co-located in historical nodes.

The coordinator loads a set of rules from the metadata storage. Rules may be specific to a certain datasource and/or a default set of rules can be configured. Rules are read in order and hence the ordering of rules is important. The coordinator will cycle through all available segments and match each segment with the first rule that applies. Each segment may only match a single rule.

Note: It is recommended that the coordinator console is used to configure rules. However, the coordinator node does have HTTP endpoints to programmatically configure rules.

Expand Down Expand Up @@ -126,6 +129,65 @@ Period drop rules are of the form:

The interval of a segment will be compared against the specified period. The period is from some time in the past to the current time. The rule matches if the period contains the interval.

Broadcast Rules
---------------

Broadcast rules indicate how segments of different data sources should be co-located in historical nodes.
Once a broadcast rule is configured for a data source, all segments of the data source are broadcasted to the servers holding _any segments_ of the co-located data sources.

### Forever Broadcast Rule

Forever broadcast rules are of the form:

```json
{
"type" : "broadcastForever",
"colocatedDataSources" : [ "target_source1", "target_source2" ]
}
```

* `type` - this should always be "broadcastForever"
* `colocatedDataSources` - A JSON List containing data source names to be co-located. `null` and empty list means broadcasting to every node in the cluster.

### Interval Broadcast Rule

Interval broadcast rules are of the form:

```json
{
"type" : "broadcastByInterval",
"colocatedDataSources" : [ "target_source1", "target_source2" ],
"interval" : "2012-01-01/2013-01-01"
}
```

* `type` - this should always be "broadcastByInterval"
* `colocatedDataSources` - A JSON List containing data source names to be co-located. `null` and empty list means broadcasting to every node in the cluster.
* `interval` - A JSON Object representing ISO-8601 Periods. Only the segments of the interval will be broadcasted.

### Period Broadcast Rule

Period broadcast rules are of the form:

```json
{
"type" : "broadcastByPeriod",
"colocatedDataSources" : [ "target_source1", "target_source2" ],
"period" : "P1M"
}
```

* `type` - this should always be "broadcastByPeriod"
* `colocatedDataSources` - A JSON List containing data source names to be co-located. `null` and empty list means broadcasting to every node in the cluster.
* `period` - A JSON Object representing ISO-8601 Periods

The interval of a segment will be compared against the specified period. The period is from some time in the past to the current time. The rule matches if the period contains the interval.

<div class="note caution">
broadcast rules don't guarantee that segments of the data sources are always co-located because segments for the colocated data sources are not loaded together atomically.
If you want to always co-locate the segments of some data sources together, it is recommended to leave colocatedDataSources empty.
</div>

# Permanently Deleting Data

Druid can fully drop data from the cluster, wipe the metadata store entry, and remove the data from deep storage for any segments that are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public Iterable<ImmutableDruidDataSource> getDataSources()
return dataSources.values();
}

public ImmutableDruidDataSource getDataSource(String name)
{
return dataSources.get(name);
}

public Map<String, DataSegment> getSegments()
{
return segments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import com.google.common.collect.Ordering;
import io.druid.client.ImmutableDruidServer;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Contains a representation of the current state of the cluster by tier.
Expand Down Expand Up @@ -70,6 +73,11 @@ public MinMaxPriorityQueue<ServerHolder> getServersByTier(String tier)
return cluster.get(tier);
}

public List<ServerHolder> getAllServers()
{
return cluster.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
}

public Iterable<MinMaxPriorityQueue<ServerHolder>> getSortedServersByTier()
{
return cluster.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class DruidCoordinatorRuntimeParams
private final DateTime balancerReferenceTimestamp;
private final BalancerStrategy balancerStrategy;

public DruidCoordinatorRuntimeParams(
private DruidCoordinatorRuntimeParams(
long startTime,
DruidCluster druidCluster,
MetadataRuleManager databaseRuleManager,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.coordinator.rules;

import com.metamx.emitter.EmittingLogger;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import io.druid.server.coordinator.ServerHolder;
import io.druid.timeline.DataSegment;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public abstract class BroadcastDistributionRule implements Rule
{
private static final EmittingLogger log = new EmittingLogger(BroadcastDistributionRule.class);

@Override
public CoordinatorStats run(
DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment
)
{
// Find servers which holds the segments of co-located data source
final Set<ServerHolder> loadServerHolders = new HashSet<>();
final Set<ServerHolder> dropServerHolders = new HashSet<>();
final List<String> colocatedDataSources = getColocatedDataSources();
if (colocatedDataSources == null || colocatedDataSources.isEmpty()) {
loadServerHolders.addAll(params.getDruidCluster().getAllServers());
} else {
params.getDruidCluster().getAllServers().forEach(
eachHolder -> {
if (colocatedDataSources.stream()
.anyMatch(source -> eachHolder.getServer().getDataSource(source) != null)) {
loadServerHolders.add(eachHolder);
} else if (eachHolder.isServingSegment(segment)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should skip holders that already have the segment in their drop lists (peon.getSegmentsToDrop()), like DruidCoordinator.moveSegment and DruidCoordinatorCleanupUnneeded.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I added it.

if (!eachHolder.getPeon().getSegmentsToDrop().contains(segment)) {
dropServerHolders.add(eachHolder);
}
}
}
);
}

final CoordinatorStats stats = new CoordinatorStats();

return stats.accumulate(assign(loadServerHolders, segment))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im wondering if the broadcast rules should label their segments a little differently, or at least be able to distinguish from actual Druid segments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll definitely make reading the logs easier

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think broadcasted segments should be regarded as same as other segments. Every types of queries can be queried on broadcasted segments, but the broadcast information is required by only joins.

The information of which sources are broadcasted or not is required by brokers and historicals (and realtimes in the future). Brokers decides a given query should be distributed to which nodes, and this is determined by selecting the nodes holding the segments of non-broadcasted tables. Historicals, as proposed in #4032, join broadcasted segments first and then subsequently join the result with non-broadcasted segments in parallel.

I'm currently thinking that brokers are able to figure out which tables are broadcasted by looking at BroadcastDistributionRules, and they will add this information to QueryContext for historicals. If this works, broadcast segments can be regarded as normal.

What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson Okay that makes sense to me. Can broadcast segments be created dynamically?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe possible in the future when we support the feature of broadcasting tables on the fly, but not yet.

.accumulate(drop(dropServerHolders, segment));
}

private CoordinatorStats assign(
final Set<ServerHolder> serverHolders,
final DataSegment segment
)
{
final CoordinatorStats stats = new CoordinatorStats();
stats.addToGlobalStat(LoadRule.ASSIGNED_COUNT, 0);

for (ServerHolder holder : serverHolders) {
if (segment.getSize() > holder.getAvailableSize()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that like CostBalancerStrategy, this should check if holder.isLoadingSegment(segment) and skip if so.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I added it.

log.makeAlert("Failed to broadcast segment for [%s]", segment.getDataSource())
.addData("segmentId", segment.getIdentifier())
.addData("segmentSize", segment.getSize())
.addData("hostName", holder.getServer().getHost())
.addData("availableSize", holder.getAvailableSize())
.emit();
} else {
if (!holder.isLoadingSegment(segment)) {
holder.getPeon().loadSegment(
segment,
null
);

stats.addToGlobalStat(LoadRule.ASSIGNED_COUNT, 1);
}
}
}

return stats;
}

private CoordinatorStats drop(
final Set<ServerHolder> serverHolders,
final DataSegment segment
)
{
CoordinatorStats stats = new CoordinatorStats();

for (ServerHolder holder : serverHolders) {
holder.getPeon().dropSegment(segment, null);
stats.addToGlobalStat(LoadRule.DROPPED_COUNT, 1);
}

return stats;
}

public abstract List<String> getColocatedDataSources();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.coordinator.rules;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.util.List;
import java.util.Objects;

public class ForeverBroadcastDistributionRule extends BroadcastDistributionRule
{
static final String TYPE = "broadcastForever";

private final List<String> colocatedDataSources;

@JsonCreator
public ForeverBroadcastDistributionRule(
@JsonProperty("colocatedDataSources") List<String> colocatedDataSources
)
{
this.colocatedDataSources = colocatedDataSources;
}

@Override
@JsonProperty
public String getType()
{
return TYPE;
}

@Override
@JsonProperty
public List<String> getColocatedDataSources()
{
return colocatedDataSources;
}

@Override
public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
{
return true;
}

@Override
public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
{
return true;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}

if (o == null || o.getClass() != getClass()) {
return false;
}

ForeverBroadcastDistributionRule that = (ForeverBroadcastDistributionRule) o;
return Objects.equals(colocatedDataSources, that.colocatedDataSources);
}

@Override
public int hashCode()
{
return Objects.hash(getType(), colocatedDataSources);
}
}
Loading