Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/content/design/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ Segments can be automatically loaded and dropped from the cluster based on a set

### Cleaning Up Segments

Each run, the Druid Coordinator compares the list of available database segments in the database with the current segments in the cluster. Segments that are not in the database but are still being served in the cluster are flagged and appended to a removal list. Segments that are overshadowed (their versions are too old and their data has been replaced by newer segments) are also dropped.
Note that if all segments in database are deleted(or marked unused), then Coordinator will not drop anything from the Historicals. This is done to prevent a race condition in which the Coordinator would drop all segments if it started running cleanup before it finished polling the database for available segments for the first time and believed that there were no segments.
Each run, the Druid coordinator compares the list of available database segments in the database with the current segments in the cluster. Segments that are not in the database but are still being served in the cluster are flagged and appended to a removal list. Segments that are overshadowed (their versions are too old and their data has been replaced by newer segments) are also dropped.

### Segment Availability

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,25 @@ public interface MetadataSegmentManager
@Nullable
ImmutableDruidDataSource getDataSource(String dataSourceName);

/**
* Returns a collection of known datasources.
*
* Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
* not yet been polled.)
*/
@Nullable
Collection<ImmutableDruidDataSource> getDataSources();

/**
* Returns an iterable to go over all segments in all data sources. The order in which segments are iterated is
* unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
* (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
* several times.
*
* Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
* not yet been polled.)
*/
@Nullable
Iterable<DataSegment> iterateAllSegments();

Collection<String> getAllDataSourceNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

/**
*
*/
@ManageLifecycle
public class SQLMetadataSegmentManager implements MetadataSegmentManager
Expand Down Expand Up @@ -102,9 +104,14 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final SQLMetadataConnector connector;

private ConcurrentHashMap<String, DruidDataSource> dataSources = new ConcurrentHashMap<>();
// Volatile since this reference is reassigned in "poll" and then read from in other threads.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't explain why does the field need to be volatile. The underlying reason is that the field is effectively a lazily initialized field, and the absence of volatile may lead to NPE unless the rest of the code always reads the field to local variables before using, that is too much of a burden for developers: https://github.com/code-review-checklists/java-concurrency#safe-local-dcl

(Actually, as you translated all code to monadic Optional use of dataSources with a single read, it does not need to be volatile, but I would say that those monadic Optional chains are worse than simple if-else.)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, because of this: #7447 (comment) the previous comment is irrelevant, there is actually no reason why the field should be volatile in the current version of the code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying it is fine to have a field that is written from one thread, and read from another, with no synchronization or volatile marker, as long as each reader reads it into a local variable first? My understanding of the JMM is that in this case there's no happens-before relationship established, and all bets are off - readers have no guarantees around ever reading anything nonnull (although in practice they probably will, but that's not something you'd want to depend on).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Practically, as you noted, it doesn't matter (on x86 platform which Druid targets). Formally, volatile is still not enough to ensure "ever reading non-null" before Java 9 where it was formalized in this document.

// Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty map)
@Nullable
private volatile ConcurrentHashMap<String, DruidDataSource> dataSources = null;

/** The number of times this SQLMetadataSegmentManager was started. */
/**
* The number of times this SQLMetadataSegmentManager was started.
*/
private long startCount = 0;
/**
* Equal to the current {@link #startCount} value, if the SQLMetadataSegmentManager is currently started; -1 if
Expand Down Expand Up @@ -200,7 +207,7 @@ public void stop()
return;
}

dataSources = new ConcurrentHashMap<>();
dataSources = null;
currentStartOrder = -1;
exec.shutdownNow();
exec = null;
Expand Down Expand Up @@ -325,7 +332,7 @@ public boolean removeDataSource(final String dataSource)
).bind("dataSource", dataSource).execute()
);

dataSources.remove(dataSource);
Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you use Optional.ofNullable(dataSources).ifPresent() instead of

if (dataSources != null) {
  ...
}

in this PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was because dataSources can become null after being non-null, if stop() is called. Since stop() could be called at any time, dataSources should only be dereferenced one time per method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment about this - the variable looks at first glance like a lazy-initialization, but it's actually something that can transition back and forth between null and nonnull, so it needs to be handled differently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in #7452.


if (removed == 0) {
return false;
Expand All @@ -348,18 +355,21 @@ public boolean removeSegment(String dataSourceName, final String segmentId)
// Call iteratePossibleParsingsWithDataSource() outside of dataSources.computeIfPresent() because the former is a
// potentially expensive operation, while lambda to be passed into computeIfPresent() should preferably run fast.
List<SegmentId> possibleSegmentIds = SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId);
dataSources.computeIfPresent(
dataSourceName,
(dsName, dataSource) -> {
for (SegmentId possibleSegmentId : possibleSegmentIds) {
if (dataSource.removeSegment(possibleSegmentId) != null) {
break;
}
}
// Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
//noinspection ReturnOfNull
return dataSource.isEmpty() ? null : dataSource;
}
Optional.ofNullable(dataSources).ifPresent(
m ->
m.computeIfPresent(
dataSourceName,
(dsName, dataSource) -> {
for (SegmentId possibleSegmentId : possibleSegmentIds) {
if (dataSource.removeSegment(possibleSegmentId) != null) {
break;
}
}
// Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
//noinspection ReturnOfNull
return dataSource.isEmpty() ? null : dataSource;
}
)
);

return removed;
Expand All @@ -375,14 +385,17 @@ public boolean removeSegment(SegmentId segmentId)
{
try {
final boolean removed = removeSegmentFromTable(segmentId.toString());
dataSources.computeIfPresent(
segmentId.getDataSource(),
(dsName, dataSource) -> {
dataSource.removeSegment(segmentId);
// Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
//noinspection ReturnOfNull
return dataSource.isEmpty() ? null : dataSource;
}
Optional.ofNullable(dataSources).ifPresent(
m ->
m.computeIfPresent(
segmentId.getDataSource(),
(dsName, dataSource) -> {
dataSource.removeSegment(segmentId);
// Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
//noinspection ReturnOfNull
return dataSource.isEmpty() ? null : dataSource;
}
)
);
return removed;
}
Expand Down Expand Up @@ -422,23 +435,37 @@ public boolean isStarted()
@Nullable
public ImmutableDruidDataSource getDataSource(String dataSourceName)
{
final DruidDataSource dataSource = dataSources.get(dataSourceName);
final DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(dataSourceName)).orElse(null);
return dataSource == null ? null : dataSource.toImmutableDruidDataSource();
}

@Override
@Nullable
public Collection<ImmutableDruidDataSource> getDataSources()
{
return dataSources.values()
.stream()
.map(DruidDataSource::toImmutableDruidDataSource)
.collect(Collectors.toList());
return Optional.ofNullable(dataSources)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just

if (dataSources != null) {
  return dataSources.values().stream().map(...).collect(...);
} else {
 return null;
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to avoid reading the dataSources reference twice in the same method. (Same reason as https://github.com/apache/incubator-druid/pull/7447/files/39dcd326be350ca6b66e4de884708cf77413c166#r274563782)

.map(m ->
m.values()
.stream()
.map(DruidDataSource::toImmutableDruidDataSource)
.collect(Collectors.toList())
)
.orElse(null);
}

@Override
@Nullable
public Iterable<DataSegment> iterateAllSegments()
{
return () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
final ConcurrentHashMap<String, DruidDataSource> dataSourcesSnapshot = dataSources;
if (dataSourcesSnapshot == null) {
return null;
}

return () -> dataSourcesSnapshot.values()
.stream()
.flatMap(dataSource -> dataSource.getSegments().stream())
.iterator();
}

@Override
Expand Down Expand Up @@ -543,6 +570,7 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE
.addSegmentIfAbsent(segment);
});

// Replace "dataSources" atomically.
dataSources = newDataSources;
}

Expand All @@ -557,7 +585,7 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE
*/
private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment)
{
DruidDataSource dataSource = dataSources.get(segment.getDataSource());
DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(segment.getDataSource())).orElse(null);
if (dataSource == null) {
return segment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -88,6 +90,7 @@
import java.util.stream.Collectors;

/**
*
*/
@ManageLifecycle
public class DruidCoordinator
Expand Down Expand Up @@ -242,7 +245,9 @@ public Map<String, LoadQueuePeon> getLoadManagementPeons()
return loadManagementPeons;
}

/** @return tier -> { dataSource -> underReplicationCount } map */
/**
* @return tier -> { dataSource -> underReplicationCount } map
*/
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier()
{
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
Expand All @@ -251,9 +256,15 @@ public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataS
return underReplicationCountsPerDataSourcePerTier;
}

final Iterable<DataSegment> dataSegments = iterateAvailableDataSegments();

if (dataSegments == null) {
return underReplicationCountsPerDataSourcePerTier;
}

final DateTime now = DateTimes.nowUtc();

for (final DataSegment segment : iterateAvailableDataSegments()) {
for (final DataSegment segment : dataSegments) {
final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());

for (final Rule rule : rules) {
Expand Down Expand Up @@ -285,7 +296,13 @@ public Object2LongMap<String> getSegmentAvailability()
return retVal;
}

for (DataSegment segment : iterateAvailableDataSegments()) {
final Iterable<DataSegment> dataSegments = iterateAvailableDataSegments();

if (dataSegments == null) {
return retVal;
}

for (DataSegment segment : dataSegments) {
if (segmentReplicantLookup.getLoadedReplicants(segment.getId()) == 0) {
retVal.addTo(segment.getDataSource(), 1);
} else {
Expand All @@ -298,8 +315,14 @@ public Object2LongMap<String> getSegmentAvailability()

public Map<String, Double> getLoadStatus()
{
Map<String, Double> loadStatus = new HashMap<>();
for (ImmutableDruidDataSource dataSource : metadataSegmentManager.getDataSources()) {
final Map<String, Double> loadStatus = new HashMap<>();
final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();

if (dataSources == null) {
return loadStatus;
}

for (ImmutableDruidDataSource dataSource : dataSources) {
final Set<DataSegment> segments = Sets.newHashSet(dataSource.getSegments());
final int availableSegmentSize = segments.size();

Expand Down Expand Up @@ -453,7 +476,11 @@ public void moveSegment(
* is unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
* (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
* several times.
*
* Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
* not yet been polled.)
*/
@Nullable
public Iterable<DataSegment> iterateAvailableDataSegments()
{
return metadataSegmentManager.iterateAllSegments();
Expand Down Expand Up @@ -643,10 +670,16 @@ public void run()
BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec);

// Do coordinator stuff.
final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();
if (dataSources == null) {
log.info("Metadata store not polled yet, skipping this run.");
return;
}

DruidCoordinatorRuntimeParams params =
DruidCoordinatorRuntimeParams.newBuilder()
.withStartTime(startTime)
.withDataSources(metadataSegmentManager.getDataSources())
.withDataSources(dataSources)
.withDynamicConfigs(getDynamicConfigs())
.withCompactionConfig(getCompactionConfig())
.withEmitter(emitter)
Expand All @@ -656,6 +689,11 @@ public void run()
// Don't read state and run state in the same helper otherwise racy conditions may exist
if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) {
params = helper.run(params);

if (params == null) {
// This helper wanted to cancel the run. No log message, since the helper should have logged a reason.
return;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.SortedSet;

/**
*
*/
public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
{
Expand All @@ -45,21 +46,12 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
Set<DataSegment> availableSegments = params.getAvailableSegments();
DruidCluster cluster = params.getDruidCluster();

if (availableSegments.isEmpty()) {
log.info(
"Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent " +
"a race condition in which the coordinator would drop all segments if it started running cleanup before " +
"it finished polling the metadata storage for available segments for the first time."
);
return params.buildFromExisting().withCoordinatorStats(stats).build();
}

// Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It might
// not have been loaded yet since it's filled asynchronously. But it's also filled atomically, so if there are any
// segments at all, we should have all of them.)
// Note that if metadata store has no segments, then availableSegments will stay empty and nothing will be dropped.
// This is done to prevent a race condition in which the coordinator would drop all segments if it started running
// cleanup before it finished polling the metadata storage for available segments for the first time.
// Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It's
// also filled atomically, so if there are any segments at all, we should have all of them.)
//
// Note that if the metadata store has not been polled yet, "getAvailableSegments" would throw an error since
// "availableSegments" is null. But this won't happen, since the earlier helper "DruidCoordinatorSegmentInfoLoader"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it's better to identify symbols in comments the following ways instead of putting them into double quotes:

  • Adding () to the end of method names
  • Class names start with a capital and have CamelCase, so they don't need any extra identification. Same about variable names with camelCase.
  • Only single-word variable names may need to be identified, but IMO better to use backticks (`) instead of double quotes.

// would have canceled the run.
for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
for (ServerHolder serverHolder : serverHolders) {
ImmutableDruidServer server = serverHolder.getServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@

import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;

import javax.annotation.Nullable;

/**
*
*/
public interface DruidCoordinatorHelper
{
/**
* Implementations of this method run various activities performed by the coordinator.
* Input params can be used and modified. They are typically in a list and returned
* DruidCoordinatorRuntimeParams is passed to the next helper.
*
* @param params
* @return same as input or a modified value to be used by next helper.
*
* @return same as input or a modified value to be used by next helper. Null return
* values will prevent future DruidCoordinatorHelpers from running until the next
* cycle.
*/
@Nullable
DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params);
}
Loading