Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.sql.calcite.schema;

import com.amazonaws.annotation.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
Expand Down Expand Up @@ -63,7 +64,6 @@
import org.apache.druid.sql.calcite.view.DruidViewMacro;
import org.apache.druid.sql.calcite.view.ViewManager;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;

import java.io.IOException;
import java.util.Comparator;
Expand Down Expand Up @@ -95,8 +95,9 @@ public class DruidSchema extends AbstractSchema

private static final EmittingLogger log = new EmittingLogger(DruidSchema.class);
private static final int MAX_SEGMENTS_PER_QUERY = 15000;
private static final long IS_PUBLISHED = 0;
private static final long IS_AVAILABLE = 1;
private static final long DEFAULT_IS_PUBLISHED = 0;
private static final long DEFAULT_IS_AVAILABLE = 1;
private static final long DEFAULT_NUM_ROWS = 0;

private final QueryLifecycleFactory queryLifecycleFactory;
private final PlannerConfig config;
Expand All @@ -107,12 +108,12 @@ public class DruidSchema extends AbstractSchema
// For awaitInitialization.
private final CountDownLatch initialized = new CountDownLatch(1);

// Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized
// Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized, segmentMetadata
private final Object lock = new Object();

// DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for that segment.
// Use TreeMap for segments so they are merged in deterministic order, from older to newer.
// This data structure need to be accessed in a thread-safe way since SystemSchema accesses it
@GuardedBy("lock")
private final Map<String, TreeMap<DataSegment, SegmentMetadataHolder>> segmentMetadataInfo = new HashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this comment, you could add @GuardedBy("lock"). It sends the same message, and could also help with automated bug detection.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion, thanks.

private int totalSegments = 0;

Expand Down Expand Up @@ -351,7 +352,8 @@ protected Multimap<String, org.apache.calcite.schema.Function> getFunctionMultim
return builder.build();
}

private void addSegment(final DruidServerMetadata server, final DataSegment segment)
@VisibleForTesting
void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
synchronized (lock) {
final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
Expand All @@ -360,16 +362,18 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm
// segmentReplicatable is used to determine if segments are served by realtime servers or not
final long isRealtime = server.segmentReplicatable() ? 0 : 1;

final Map<SegmentId, Set<String>> serverSegmentMap = ImmutableMap.of(
final Set<String> servers = ImmutableSet.of(server.getName());
holder = SegmentMetadataHolder.builder(
segment.getId(),
ImmutableSet.of(server.getName())
);

holder = SegmentMetadataHolder
.builder(segment.getId(), IS_PUBLISHED, IS_AVAILABLE, isRealtime, serverSegmentMap)
.build();
DEFAULT_IS_PUBLISHED,
DEFAULT_IS_AVAILABLE,
isRealtime,
servers,
null,
DEFAULT_NUM_ROWS
).build();
// Unknown segment.
setSegmentSignature(segment, holder);
setSegmentMetadataHolder(segment, holder);
segmentsNeedingRefresh.add(segment);
if (!server.segmentReplicatable()) {
log.debug("Added new mutable segment[%s].", segment.getId());
Expand All @@ -378,14 +382,14 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm
log.debug("Added new immutable segment[%s].", segment.getId());
}
} else {
final Map<SegmentId, Set<String>> segmentServerMap = holder.getReplicas();
final Set<String> segmentServers = holder.getReplicas();
final ImmutableSet<String> servers = new ImmutableSet.Builder<String>()
.addAll(segmentServerMap.get(segment.getId()))
.addAll(segmentServers)
.add(server.getName())
.build();
final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder
.from(holder)
.withReplicas(ImmutableMap.of(segment.getId(), servers))
.withReplicas(servers)
.build();
knownSegments.put(segment, holderWithNumReplicas);
if (server.segmentReplicatable()) {
Expand All @@ -404,7 +408,7 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm
}

@VisibleForTesting
protected void removeSegment(final DataSegment segment)
void removeSegment(final DataSegment segment)
{
synchronized (lock) {
log.debug("Segment[%s] is gone.", segment.getId());
Expand Down Expand Up @@ -435,13 +439,13 @@ private void removeServerSegment(final DruidServerMetadata server, final DataSeg
log.debug("Segment[%s] is gone from server[%s]", segment.getId(), server.getName());
final Map<DataSegment, SegmentMetadataHolder> knownSegments = segmentMetadataInfo.get(segment.getDataSource());
final SegmentMetadataHolder holder = knownSegments.get(segment);
final Map<SegmentId, Set<String>> segmentServerMap = holder.getReplicas();
final ImmutableSet<String> servers = FluentIterable.from(segmentServerMap.get(segment.getId()))
final Set<String> segmentServers = holder.getReplicas();
final ImmutableSet<String> servers = FluentIterable.from(segmentServers)
.filter(Predicates.not(Predicates.equalTo(server.getName())))
.toSet();
final SegmentMetadataHolder holderWithNumReplicas = SegmentMetadataHolder
.from(holder)
.withReplicas(ImmutableMap.of(segment.getId(), servers))
.withReplicas(servers)
.build();
knownSegments.put(segment, holderWithNumReplicas);
lock.notifyAll();
Expand All @@ -453,7 +457,7 @@ private void removeServerSegment(final DruidServerMetadata server, final DataSeg
* which may be a subset of the asked-for set.
*/
@VisibleForTesting
protected Set<DataSegment> refreshSegments(final Set<DataSegment> segments) throws IOException
Set<DataSegment> refreshSegments(final Set<DataSegment> segments) throws IOException
{
final Set<DataSegment> retVal = new HashSet<>();

Expand Down Expand Up @@ -525,7 +529,7 @@ private Set<DataSegment> refreshSegmentsForDataSource(final String dataSource, f
.withNumRows(analysis.getNumRows())
.build();
dataSourceSegments.put(segment, updatedHolder);
setSegmentSignature(segment, updatedHolder);
setSegmentMetadataHolder(segment, updatedHolder);
retVal.add(segment);
}
}
Expand All @@ -550,7 +554,8 @@ private Set<DataSegment> refreshSegmentsForDataSource(final String dataSource, f
return retVal;
}

private void setSegmentSignature(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder)
@VisibleForTesting
void setSegmentMetadataHolder(final DataSegment segment, final SegmentMetadataHolder segmentMetadataHolder)
{
synchronized (lock) {
TreeMap<DataSegment, SegmentMetadataHolder> dataSourceSegments = segmentMetadataInfo.computeIfAbsent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.druid.timeline.SegmentId;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Set;

/**
Expand All @@ -36,15 +35,25 @@ public static Builder builder(
long isPublished,
long isAvailable,
long isRealtime,
Map<SegmentId, Set<String>> segmentServerMap
Set<String> segmentServers,
RowSignature rowSignature,
long numRows
)
{
return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServerMap);
return new Builder(segmentId, isPublished, isAvailable, isRealtime, segmentServers, rowSignature, numRows);
}

public static Builder from(SegmentMetadataHolder h)
{
return new Builder(h.getSegmentId(), h.isPublished(), h.isAvailable(), h.isRealtime(), h.getReplicas());
return new Builder(
h.getSegmentId(),
h.isPublished(),
h.isAvailable(),
h.isRealtime(),
h.getReplicas(),
h.getRowSignature(),
h.getNumRows()
);
}

private final SegmentId segmentId;
Expand All @@ -54,8 +63,8 @@ public static Builder from(SegmentMetadataHolder h)
private final long isPublished;
private final long isAvailable;
private final long isRealtime;
//segmentId -> set of servers that contain the segment
private final Map<SegmentId, Set<String>> segmentServerMap;
// set of servers that contain the segment
private final Set<String> segmentServers;
private final long numRows;
@Nullable
private final RowSignature rowSignature;
Expand All @@ -66,7 +75,7 @@ private SegmentMetadataHolder(Builder builder)
this.isPublished = builder.isPublished;
this.isAvailable = builder.isAvailable;
this.isRealtime = builder.isRealtime;
this.segmentServerMap = builder.segmentServerMap;
this.segmentServers = builder.segmentServers;
this.numRows = builder.numRows;
this.segmentId = builder.segmentId;
}
Expand All @@ -91,14 +100,14 @@ public SegmentId getSegmentId()
return segmentId;
}

public Map<SegmentId, Set<String>> getReplicas()
public Set<String> getReplicas()
{
return segmentServerMap;
return segmentServers;
}

public long getNumReplicas(SegmentId segmentId)
public long getNumReplicas()
{
return segmentServerMap.get(segmentId).size();
return segmentServers.size();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we return long in this method when Set#size() returns an int? Might not be worth changing because it might blow up other stuff.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It used to be int, and then got changed to long at some point. The reason is because we support nulls for long here, but not for ints ? Not very sure...but will likely leave it to be long here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

}

public long getNumRows()
Expand All @@ -119,7 +128,7 @@ public static class Builder
private final long isAvailable;
private final long isRealtime;

private Map<SegmentId, Set<String>> segmentServerMap;
private Set<String> segmentServers;
@Nullable
private RowSignature rowSignature;
private long numRows;
Expand All @@ -129,14 +138,18 @@ private Builder(
long isPublished,
long isAvailable,
long isRealtime,
Map<SegmentId, Set<String>> segmentServerMap
Set<String> servers,
RowSignature rowSignature,
long numRows
)
{
this.segmentId = segmentId;
this.isPublished = isPublished;
this.isAvailable = isAvailable;
this.isRealtime = isRealtime;
this.segmentServerMap = segmentServerMap;
this.segmentServers = servers;
this.rowSignature = rowSignature;
this.numRows = numRows;
}

public Builder withRowSignature(RowSignature rowSignature)
Expand All @@ -151,9 +164,9 @@ public Builder withNumRows(long numRows)
return this;
}

public Builder withReplicas(Map<SegmentId, Set<String>> segmentServerMap)
public Builder withReplicas(Set<String> servers)
{
this.segmentServerMap = segmentServerMap;
this.segmentServers = servers;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public Enumerable<Object[]> scan(DataContext root)
Maps.newHashMapWithExpectedSize(druidSchema.getTotalSegments());
for (SegmentMetadataHolder h : availableSegmentMetadata.values()) {
PartialSegmentData partialSegmentData =
new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(h.getSegmentId()), h.getNumRows());
new PartialSegmentData(h.isAvailable(), h.isRealtime(), h.getNumReplicas(), h.getNumRows());
partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData);
}

Expand Down
Loading