Make realtimes available for loading segments#4148
Conversation
| { | ||
| return "ImmutableDruidDataSource{" | ||
| + "name='" + name | ||
| + "', segments='" + segmentsHolder |
There was a problem hiding this comment.
Usual Druid's toString pattern doesn't include '
There was a problem hiding this comment.
Hmm, actually I followed some other toString() implementaions like DataSegment or SegmentDescriptor. It seems we need to make a standard for toString() first.
| + "name='" + name | ||
| + "', segments='" + segmentsHolder | ||
| + "', properties='" + properties | ||
| + "'}"; |
There was a problem hiding this comment.
Doesn't include partitionNames on purpose?
There was a problem hiding this comment.
Yes, partitionNames usually includes a lot of partitions.
| public String toString() | ||
| { | ||
| return "ImmutableDruidServer{" | ||
| + "meta='" + metadata |
| + "meta='" + metadata | ||
| + "', size='" + currSize | ||
| + "', sources='" + dataSources | ||
| + "'}"; |
There was a problem hiding this comment.
Doesn't include segments on purpose?
There was a problem hiding this comment.
Yes, a server usually holds a lot of segments.
| */ | ||
| private final Map<String, Map<Integer, FireChief>> chiefs; | ||
|
|
||
| private final SegmentManager segmentManager; |
There was a problem hiding this comment.
This field is created but not used
There was a problem hiding this comment.
Ah, I added this field for the future use, but it would be fine to add later. I removed for now.
|
|
||
| private final Object lock = new Object(); | ||
| private final SegmentLoader segmentLoader; | ||
| private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> dataSources = new HashMap<>(); |
There was a problem hiding this comment.
There could be one ConcurrentHashMap<String, DataSourceState>, where DataSourceState includes VersionedIntervalTimeline<String, ReferenceCountingSegment>, size and count. Then all synchronization could be delegated to ConcurrentHashMap methods, no explicit synchronized and locks are needed in SegmentManager. Also ConcurrentHashMap's concurrency is better than synchronization on a single object.
There was a problem hiding this comment.
SegmentManager is simply separated from ServerManager. Your comments around SegmentManager and ServerManager look good, but I think it is not a part of this pr. Maybe better to raise a new issue after this pr.
There was a problem hiding this comment.
Ok but would be nice if you could do this as part of this PR, it's 15 min of work, another issue and a separate PR will eat more of everybody's attention
There was a problem hiding this comment.
For the issues which are less related to the original issues, I think it would be fine to fix them in a single PR if the changes are truly little and intuitive, but otherwise, it's better to separate to several PRs even though some PRs will be quite small. This is because
- Authors and reviewers can focus on the original issues of the PR. This will increase review speed.
- As you know, most changes require to add tests even though they are simple. This can be a burden for authors which makes the development slow down.
- I think getting other people's attention will be good because they can review PRs from other points of view.
Besides, even in the world you mentioned, a lock is needed anyway for synchronization for accessing ConcurrentHashMap and accessing/mutating DataSourceState taken from ConcurrentHashMap (please see loadSegment()). For example,
private final ConcurrentHashMap<String, DataSourceState> dataSources;
...
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
{
synchronized (lock) {
final DataSourceState dataSourceState = dataSources.get(dataSource);
final PartitionHolder<ReferenceCountingSegment> entry = dataSourceState.findEntry(segment.getInterval(), segment.getVersion);
...
dataSourceState.add(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter))
);
...
}
}For this issue, I prefer to investigate first what the exact requirements are.
There was a problem hiding this comment.
Ok for not including refactoring into this PR.
Synchronization that you mentioned still needed could be managed by ConcurrentHashMap:
dataSources.compute(dataSource, dataSourceState -> {
if (dataSourceState == null) ...
else {
dataSourceState.add(...);
return dataSourceState;
}
});ConcurrentHashMap guarantees that executions of lambdas provided to compute(), computeIfAbsent(), merge() etc. are linearizable. Internally it is implemented via the same intrinsic locks that you use explicitly, but striped over the ConcurrentHashMap entries.
There was a problem hiding this comment.
Ah, right. That way should work.
Thanks for your understanding. I'll raise a new PR after this PR.
| String dataSourceName = getDataSourceName(dataSource); | ||
|
|
||
| final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get(dataSourceName); | ||
| final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = segmentManager.getDataSources() |
There was a problem hiding this comment.
getDataSources() creates a copy, please add a method to SegmentManager to extract VersionedIntervalTimeline by dataSource.
There was a problem hiding this comment.
Thanks. I added a getTimeline() method.
| final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = dataSources.get( | ||
| dataSourceName | ||
| ); | ||
| final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = segmentManager.getDataSources() |
| * | ||
| * @see io.druid.server.coordinator.rules.LoadRule | ||
| */ | ||
| boolean segmentReplicatable() |
There was a problem hiding this comment.
Maybe isSegmentReplicationTarget()?
| * | ||
| * @return true if it is available for broadcast. | ||
| */ | ||
| boolean segmentBroadcastable() |
There was a problem hiding this comment.
Maybe isSegmentBroadcastTarget()?
…ordinator-for-realtimes
…ordinator-for-realtimes
…ordinator-for-realtimes
|
@jihoonson please resolve conflicts. Going to merge this PR tomorrow unless somebody else wants to review. |
…ordinator-for-realtimes
|
@leventov thanks. Resolved conflicts. |
| return cluster.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); | ||
| return historicals.values().stream() | ||
| .flatMap(Collection::stream) | ||
| .collect(() -> realtimes, Set::add, Set::addAll); |
There was a problem hiding this comment.
Why getAllServers() adds elements to realtimes collection?
There was a problem hiding this comment.
Thanks for catching it!
Part of #4032.
In this patch, I added SegmentManager which is separated from ServerManager and is responsible for loading and dropping segments for a node. This SegmentManager is added to both historicals and realtimes.
#4077 introduces BroadcastRule for join processing. I'll extend BroadcastRules to be applied for realtimes after it is merged.
This change is