Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions docs/operations/rule-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,22 +170,23 @@ The interval of a segment will be compared against the specified period. The per

## Broadcast Rules

Broadcast rules indicate how segments of different datasources should be co-located in Historical processes.
Once a broadcast rule is configured for a datasource, all segments of the datasource are broadcasted to the servers holding _any segments_ of the co-located datasources.
Broadcast rules indicate that segments of a data source should be loaded by all servers of a cluster of the following types: historicals, brokers, tasks, and indexers.

Note that the broadcast segments are only directly queryable through the historicals, but they are currently loaded on other server types to support join queries.

### Forever Broadcast Rule

Forever broadcast rules are of the form:

```json
{
"type" : "broadcastForever",
"colocatedDataSources" : [ "target_source1", "target_source2" ]
"type" : "broadcastForever"
}
```

* `type` - this should always be "broadcastForever"
* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster.

This rule applies to all segments of a datasource, covering all intervals.

### Interval Broadcast Rule

Expand All @@ -194,13 +195,11 @@ Interval broadcast rules are of the form:
```json
{
"type" : "broadcastByInterval",
"colocatedDataSources" : [ "target_source1", "target_source2" ],
"interval" : "2012-01-01/2013-01-01"
}
```

* `type` - this should always be "broadcastByInterval"
* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster.
* `interval` - A JSON Object representing ISO-8601 Periods. Only the segments of the interval will be broadcasted.

### Period Broadcast Rule
Expand All @@ -210,22 +209,17 @@ Period broadcast rules are of the form:
```json
{
"type" : "broadcastByPeriod",
"colocatedDataSources" : [ "target_source1", "target_source2" ],
"period" : "P1M",
"includeFuture" : true
}
```

* `type` - this should always be "broadcastByPeriod"
* `colocatedDataSources` - A JSON List containing datasource names to be co-located. `null` and empty list means broadcasting to every process in the cluster.
* `period` - A JSON Object representing ISO-8601 Periods
* `includeFuture` - A JSON Boolean indicating whether the load period should include the future. This property is optional, Default is true.

The interval of a segment will be compared against the specified period. The period is from some time in the past to the future or to the current time, which depends on `includeFuture` is true or false. The rule matches if the period *overlaps* the interval.

> broadcast rules don't guarantee that segments of the datasources are always co-located because segments for the colocated datasources are not loaded together atomically.
> If you want to always co-locate the segments of some datasources together, it is recommended to leave colocatedDataSources empty.

## Permanently deleting data

Druid can fully drop data from the cluster, wipe the metadata store entry, and remove the data from deep storage for any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,17 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)

for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();

for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines
.computeIfAbsent(
dataSource.getName(),
dsName -> new VersionedIntervalTimeline<>(Comparator.naturalOrder())
);
VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator());
}
addSegmentsFromServer(serverHolder, timelines);
}
}

for (ServerHolder serverHolder : cluster.getBrokers()) {
addSegmentsFromServer(serverHolder, timelines);
}

// Note that we do not include segments from ingestion services such as tasks or indexers,
// to prevent unpublished segments from prematurely overshadowing segments.

// Mark all segments as unused in db that are overshadowed by served segments
for (DataSegment dataSegment : params.getUsedSegments()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
Expand All @@ -83,4 +81,21 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)

return params.buildFromExisting().withCoordinatorStats(stats).build();
}

private void addSegmentsFromServer(
ServerHolder serverHolder,
Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines
)
{
ImmutableDruidServer server = serverHolder.getServer();

for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines
.computeIfAbsent(
dataSource.getName(),
dsName -> new VersionedIntervalTimeline<>(Comparator.naturalOrder())
);
VersionedIntervalTimeline.addSegments(timeline, dataSource.getSegments().iterator());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator.duty;

import com.google.common.collect.Lists;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataRuleManager;
Expand Down Expand Up @@ -103,7 +104,21 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)

final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
int missingRules = 0;

final Set<String> broadcastDatasources = new HashSet<>();
for (ImmutableDruidDataSource dataSource : params.getDataSourcesSnapshot().getDataSourcesMap().values()) {
List<Rule> rules = databaseRuleManager.getRulesWithDefault(dataSource.getName());
for (Rule rule : rules) {
// A datasource is considered a broadcast datasource if it has any broadcast rules.
// The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules
// executes before BalanceSegments.
if (rule instanceof BroadcastDistributionRule) {
broadcastDatasources.add(dataSource.getName());
break;
}
}
}

for (DataSegment segment : params.getUsedSegments()) {
if (overshadowed.contains(segment.getId())) {
// Skipping overshadowed segments
Expand All @@ -115,12 +130,6 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
if (rule.appliesTo(segment, now)) {
stats.accumulate(rule.run(coordinator, paramsWithReplicationManager, segment));
foundMatchingRule = true;

// The set of broadcast datasources is used by BalanceSegments, so it's important that RunRules
// executes before BalanceSegments
if (rule instanceof BroadcastDistributionRule) {
broadcastDatasources.add(segment.getDataSource());
}
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.LoadQueuePeon;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;

/**
* Unloads segments that are no longer marked as used from Historical servers.
* Unloads segments that are no longer marked as used from servers.
*/
public class UnloadUnusedSegments implements CoordinatorDuty
{
Expand All @@ -46,31 +51,102 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
Set<DataSegment> usedSegments = params.getUsedSegments();
DruidCluster cluster = params.getDruidCluster();

Map<String, Boolean> broadcastStatusByDatasource = new HashMap<>();
for (String broadcastDatasource : params.getBroadcastDatasources()) {
broadcastStatusByDatasource.put(broadcastDatasource, true);
}

for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();
handleUnusedSegmentsForServer(
serverHolder,
usedSegments,
params,
stats,
false,
broadcastStatusByDatasource
);
}
}

for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
for (DataSegment segment : dataSource.getSegments()) {
if (!usedSegments.contains(segment)) {
LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName());
for (ServerHolder serverHolder : cluster.getBrokers()) {
handleUnusedSegmentsForServer(
serverHolder,
usedSegments,
params,
stats,
false,
broadcastStatusByDatasource
);
}

for (ServerHolder serverHolder : cluster.getRealtimes()) {
handleUnusedSegmentsForServer(
serverHolder,
usedSegments,
params,
stats,
true,
broadcastStatusByDatasource
);
}

return params.buildFromExisting().withCoordinatorStats(stats).build();
}

if (!queuePeon.getSegmentsToDrop().contains(segment)) {
queuePeon.dropSegment(segment, () -> {});
stats.addToTieredStat("unneededCount", server.getTier(), 1);
log.info(
"Dropping uneeded segment [%s] from server [%s] in tier [%s]",
segment.getId(),
server.getName(),
server.getTier()
);
private void handleUnusedSegmentsForServer(
ServerHolder serverHolder,
Set<DataSegment> usedSegments,
DruidCoordinatorRuntimeParams params,
CoordinatorStats stats,
boolean dropBroadcastOnly,
Map<String, Boolean> broadcastStatusByDatasource
)
{
ImmutableDruidServer server = serverHolder.getServer();
for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
boolean isBroadcastDatasource = broadcastStatusByDatasource.computeIfAbsent(
dataSource.getName(),
(dataSourceName) -> {
List<Rule> rules = params.getDatabaseRuleManager().getRulesWithDefault(dataSource.getName());
for (Rule rule : rules) {
// A datasource is considered a broadcast datasource if it has any broadcast rules.
if (rule instanceof BroadcastDistributionRule) {
return true;
}
}
return false;
}
);

// The coordinator tracks used segments by examining the metadata store.
// For tasks, the segments they create are unpublished, so those segments will get dropped
// unless we exclude them here. We currently drop only broadcast segments in that case.
// This check relies on the assumption that queryable stream tasks will never
// ingest data to a broadcast datasource. If a broadcast datasource is switched to become a non-broadcast
// datasource, this will result in the those segments not being dropped from tasks.
// A more robust solution which requires a larger rework could be to expose
// the set of segments that were created by a task/indexer here, and exclude them.
if (dropBroadcastOnly && !isBroadcastDatasource) {
continue;
}

for (DataSegment segment : dataSource.getSegments()) {
if (!usedSegments.contains(segment)) {
LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName());

if (!queuePeon.getSegmentsToDrop().contains(segment)) {
queuePeon.dropSegment(segment, () -> {});
stats.addToTieredStat("unneededCount", server.getTier(), 1);
log.info(
"Dropping uneeded segment [%s] from server [%s] in tier [%s]",
segment.getId(),
server.getName(),
server.getTier()
);
}
}
}
}

return params.buildFromExisting().withCoordinatorStats(stats).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.java.util.common.DateTimes;
Expand All @@ -39,9 +41,11 @@
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.List;

@RunWith(JUnitParamsRunner.class)
public class MarkAsUnusedOvershadowedSegmentsTest
{
MarkAsUnusedOvershadowedSegments markAsUnusedOvershadowedSegments;
Expand Down Expand Up @@ -69,8 +73,16 @@ public class MarkAsUnusedOvershadowedSegmentsTest
.build();

@Test
public void testRun()
@Parameters(
{
"historical",
"broker"
}
)
public void testRun(String serverTypeString)
{
ServerType serverType = ServerType.fromString(serverTypeString);

markAsUnusedOvershadowedSegments =
new MarkAsUnusedOvershadowedSegments(coordinator);
usedSegments = ImmutableList.of(segmentV1, segmentV0, segmentV2);
Expand All @@ -95,7 +107,7 @@ public void testRun()
.andReturn("")
.anyTimes();
EasyMock.expect(druidServer.getType())
.andReturn(ServerType.HISTORICAL)
.andReturn(serverType)
.anyTimes();

EasyMock.expect(druidServer.getDataSources())
Expand Down
Loading