Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public ShardSpec apply(PartitionChunk<ServerSelector> input)
Hasher hasher = Hashing.sha1().newHasher();
boolean hasOnlyHistoricalSegments = true;
for (Pair<ServerSelector, SegmentDescriptor> p : segments) {
if (!p.lhs.pick().getServer().isAssignable()) {
if (!p.lhs.pick().getServer().segmentReplicatable()) {
hasOnlyHistoricalSegments = false;
break;
}
Expand Down Expand Up @@ -429,7 +429,7 @@ private void addSequencesFromServer(ArrayList<Sequence<T>> listOfSequences)
final MultipleSpecificSegmentSpec segmentSpec = new MultipleSpecificSegmentSpec(descriptors);

final Sequence<T> resultSeqToAdd;
if (!server.isAssignable() || !populateCache || isBySegment) { // Direct server queryable
if (!server.segmentReplicatable() || !populateCache || isBySegment) { // Direct server queryable
if (!isBySegment) {
resultSeqToAdd = clientQueryable.run(queryPlus.withQuerySegmentSpec(segmentSpec), responseContext);
} else {
Expand Down
7 changes: 4 additions & 3 deletions server/src/main/java/io/druid/client/DruidServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;

import java.util.Collections;
Expand Down Expand Up @@ -111,7 +112,7 @@ public long getMaxSize()
return metadata.getMaxSize();
}

public String getType()
public ServerType getType()
{
return metadata.getType();
}
Expand All @@ -121,9 +122,9 @@ public String getTier()
return metadata.getTier();
}

public boolean isAssignable()
public boolean segmentReplicatable()
{
return metadata.isAssignable();
return metadata.segmentReplicatable();
}

public int getPriority()
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/io/druid/client/ImmutableDruidDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,15 @@ public Set<DataSegment> getSegments()
{
return segmentsHolder;
}

@Override
public String toString()
{
// partitionNames is intentionally ignored because it is usually large
return "ImmutableDruidDataSource{"
+ "name='" + name
+ "', segments='" + segmentsHolder
+ "', properties='" + properties
+ "'}";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't include partitionNames on purpose?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, partitionNames usually includes a lot of partitions.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please leave a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

}
}
14 changes: 13 additions & 1 deletion server/src/main/java/io/druid/client/ImmutableDruidServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableMap;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;

import java.util.Map;
Expand Down Expand Up @@ -72,7 +73,7 @@ public long getMaxSize()
return metadata.getMaxSize();
}

public String getType()
public ServerType getType()
{
return metadata.getType();
}
Expand Down Expand Up @@ -106,4 +107,15 @@ public Map<String, DataSegment> getSegments()
{
return segments;
}

@Override
public String toString()
{
// segments is intentionally ignored because it is usually large
return "ImmutableDruidServer{"
+ "meta='" + metadata
+ "', size='" + currSize
+ "', sources='" + dataSources
+ "'}";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't include segments on purpose?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a server usually holds a lot of segments.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please leave a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.segment.realtime;


import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
Expand Down Expand Up @@ -84,6 +85,7 @@ public RealtimeManager(
this(fireDepartments, conglomerate, serverAnnouncer, Maps.newHashMap());
}

@VisibleForTesting
RealtimeManager(
List<FireDepartment> fireDepartments,
QueryRunnerFactoryConglomerate conglomerate,
Expand All @@ -94,7 +96,7 @@ public RealtimeManager(
this.fireDepartments = fireDepartments;
this.conglomerate = conglomerate;
this.serverAnnouncer = serverAnnouncer;
this.chiefs = chiefs;
this.chiefs = chiefs == null ? Maps.newHashMap() : Maps.newHashMap(chiefs);
}

@LifecycleStart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ static boolean isHandOffComplete(List<ImmutableSegmentLoadInfo> serverView, Segm
@Override
public boolean apply(DruidServerMetadata input)
{
return input.isAssignable();
return input.segmentReplicatable();
}
}
)) {
Expand Down
196 changes: 196 additions & 0 deletions server/src/main/java/io/druid/server/SegmentManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server;

import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.collections.CountingMap;
import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.Segment;
import io.druid.segment.loading.SegmentLoader;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class SegmentManager
{
private static final EmittingLogger log = new EmittingLogger(SegmentManager.class);

private final Object lock = new Object();
private final SegmentLoader segmentLoader;
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> dataSources = new HashMap<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be one ConcurrentHashMap<String, DataSourceState>, where DataSourceState includes VersionedIntervalTimeline<String, ReferenceCountingSegment>, size and count. Then all synchronization could be delegated to ConcurrentHashMap methods, no explicit synchronized and locks are needed in SegmentManager. Also ConcurrentHashMap's concurrency is better than synchronization on a single object.

Copy link
Copy Markdown
Contributor Author

@jihoonson jihoonson May 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SegmentManager is simply separated from ServerManager. Your comments around SegmentManager and ServerManager look good, but I think it is not a part of this pr. Maybe better to raise a new issue after this pr.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok but would be nice if you could do this as part of this PR, it's 15 min of work, another issue and a separate PR will eat more of everybody's attention

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the issues which are less related to the original issues, I think it would be fine to fix them in a single PR if the changes are truly little and intuitive, but otherwise, it's better to separate to several PRs even though some PRs will be quite small. This is because

  1. Authors and reviewers can focus on the original issues of the PR. This will increase review speed.
  2. As you know, most changes require to add tests even though they are simple. This can be a burden for authors which makes the development slow down.
  3. I think getting other people's attention will be good because they can review PRs from other points of view.

Besides, even in the world you mentioned, a lock is needed anyway for synchronization for accessing ConcurrentHashMap and accessing/mutating DataSourceState taken from ConcurrentHashMap (please see loadSegment()). For example,

private final ConcurrentHashMap<String, DataSourceState> dataSources;
...
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
{
  synchronized (lock) {
    final DataSourceState dataSourceState = dataSources.get(dataSource);
    final PartitionHolder<ReferenceCountingSegment> entry = dataSourceState.findEntry(segment.getInterval(), segment.getVersion);
    ...
    dataSourceState.add(
        segment.getInterval(),
        segment.getVersion(),
        segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter))
    );
    ...
  }
}

For this issue, I prefer to investigate first what the exact requirements are.

Copy link
Copy Markdown
Member

@leventov leventov May 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok for not including refactoring into this PR.

Synchronization that you mentioned still needed could be managed by ConcurrentHashMap:

dataSources.compute(dataSource, dataSourceState -> {
  if (dataSourceState == null) ...
  else {
    dataSourceState.add(...);
    return dataSourceState;
  }
});

ConcurrentHashMap guarantees that executions of lambdas provided to compute(), computeIfAbsent(), merge() etc. are linearizable. Internally it is implemented via the same intrinsic locks that you use explicitly, but striped over the ConcurrentHashMap entries.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right. That way should work.
Thanks for your understanding. I'll raise a new PR after this PR.

private final CountingMap<String> dataSourceSizes = new CountingMap<>();
private final CountingMap<String> dataSourceCounts = new CountingMap<>();

@Inject
public SegmentManager(
SegmentLoader segmentLoader
)
{
this.segmentLoader = segmentLoader;
}

public Map<String, Long> getDataSourceSizes()
{
synchronized (dataSourceSizes) {
return dataSourceSizes.snapshot();
}
}

public Map<String, Long> getDataSourceCounts()
{
synchronized (dataSourceCounts) {
return dataSourceCounts.snapshot();
}
}

public boolean isSegmentCached(final DataSegment segment) throws SegmentLoadingException
{
return segmentLoader.isSegmentLoaded(segment);
}

public VersionedIntervalTimeline<String, ReferenceCountingSegment> getTimeline(String dataSource)
{
synchronized (lock) {
return dataSources.get(dataSource);
}
}

/**
* Load a single segment.
*
* @param segment segment to load
*
* @return true if the segment was newly loaded, false if it was already loaded
*
* @throws SegmentLoadingException if the segment cannot be loaded
*/
public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException
{
final Segment adapter = getAdapter(segment);

synchronized (lock) {
final String dataSource = segment.getDataSource();
final VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.computeIfAbsent(
dataSource,
k -> new VersionedIntervalTimeline<>(Ordering.natural())
);

final PartitionHolder<ReferenceCountingSegment> entry = loadedIntervals.findEntry(
segment.getInterval(),
segment.getVersion()
);
if ((entry != null) && (entry.getChunk(segment.getShardSpec().getPartitionNum()) != null)) {
log.warn("Told to load a adapter for a segment[%s] that already exists", segment.getIdentifier());
return false;
}

loadedIntervals.add(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter))
);
synchronized (dataSourceSizes) {
dataSourceSizes.add(dataSource, segment.getSize());
}
synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, 1L);
}
return true;
}
}

private Segment getAdapter(final DataSegment segment) throws SegmentLoadingException
{
final Segment adapter;
try {
adapter = segmentLoader.getSegment(segment);
}
catch (SegmentLoadingException e) {
try {
segmentLoader.cleanup(segment);
}
catch (SegmentLoadingException e1) {
e.addSuppressed(e1);
}
throw e;
}

if (adapter == null) {
throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec());
}
return adapter;
}

public void dropSegment(final DataSegment segment) throws SegmentLoadingException
{
String dataSource = segment.getDataSource();
synchronized (lock) {
VersionedIntervalTimeline<String, ReferenceCountingSegment> loadedIntervals = dataSources.get(dataSource);

if (loadedIntervals == null) {
log.info("Told to delete a queryable for a dataSource[%s] that doesn't exist.", dataSource);
return;
}

PartitionChunk<ReferenceCountingSegment> removed = loadedIntervals.remove(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(null)
);
ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject();

if (oldQueryable != null) {
synchronized (dataSourceSizes) {
dataSourceSizes.add(dataSource, -segment.getSize());
}
synchronized (dataSourceCounts) {
dataSourceCounts.add(dataSource, -1L);
}

try {
log.info("Attempting to close segment %s", segment.getIdentifier());
oldQueryable.close();
}
catch (IOException e) {
log.makeAlert(e, "Exception closing segment")
.addData("dataSource", dataSource)
.addData("segmentId", segment.getIdentifier())
.emit();
}
} else {
log.info(
"Told to delete a queryable on dataSource[%s] for interval[%s] and version [%s] that I don't have.",
dataSource,
segment.getInterval(),
segment.getVersion()
);
}
}
segmentLoader.cleanup(segment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ private String makeServedSegmentPath()
return makeServedSegmentPath(
UUIDUtils.generateUuid(
server.getHost(),
server.getType(),
server.getType().toString(),
server.getTier(),
new DateTime().toString()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class DruidServerMetadata
private final String host;
private final long maxSize;
private final String tier;
private final String type;
private final ServerType type;
private final int priority;

@JsonCreator
Expand All @@ -47,7 +47,7 @@ public DruidServerMetadata(
this.host = host;
this.maxSize = maxSize;
this.tier = tier;
this.type = type;
this.type = ServerType.fromString(type);
this.priority = priority;
}

Expand Down Expand Up @@ -76,7 +76,7 @@ public String getTier()
}

@JsonProperty
public String getType()
public ServerType getType()
{
return type;
}
Expand All @@ -87,9 +87,9 @@ public int getPriority()
return priority;
}

public boolean isAssignable()
public boolean segmentReplicatable()
{
return getType().equalsIgnoreCase("historical") || getType().equalsIgnoreCase("bridge");
return type.isSegmentReplicationTarget();
}

@Override
Expand Down
Loading