Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions core/src/main/java/org/apache/druid/timeline/DataSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,22 @@ public static class PruneLoadSpecHolder
@Inject(optional = true) @PruneLoadSpec boolean pruneLoadSpec = false;
}

public static DataSegment intern(DataSegment dataSegment, boolean updateLoadSpec)
{
DataSegment result = DATA_SEGMENT_INTERNER.intern(dataSegment);
if (updateLoadSpec) {
result.dimensions = dataSegment.dimensions;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think DataSegment shouldn't become mutable. However, it would be nice if you would solve this problem in this PR: #6358.

result.metrics = dataSegment.metrics;
result.loadSpec = dataSegment.loadSpec;
}
return result;
}


private static final Interner<String> STRING_INTERNER = Interners.newWeakInterner();
private static final Interner<List<String>> DIMENSIONS_INTERNER = Interners.newWeakInterner();
private static final Interner<List<String>> METRICS_INTERNER = Interners.newWeakInterner();
private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This humongous Map with millions of weak references would be a problem for GC in itself: see #6357 for context.

You should adopt the design with BatchServerInverntoryView and SQLSegmentMetadataManager probing into each other's memory, similarly to what is explained here: #7395 (comment)

private static final Map<String, Object> PRUNED_LOAD_SPEC = ImmutableMap.of(
"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space",
""
Expand All @@ -87,9 +100,9 @@ public static class PruneLoadSpecHolder
private final Integer binaryVersion;
private final SegmentId id;
@Nullable
private final Map<String, Object> loadSpec;
private final List<String> dimensions;
private final List<String> metrics;
private volatile Map<String, Object> loadSpec;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making DataSegment non-immutable was ruled out in this discussion: #7571. Please read it in full. I proposed a solution here: #7571 (comment). Please check if you can implement it in this PR.

private volatile List<String> dimensions;
private volatile List<String> metrics;
private final ShardSpec shardSpec;
private final long size;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.timeline.DataSegment;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -94,8 +95,22 @@ protected DruidServer addInnerInventory(
return container;
}

private Set<DataSegment> dataSegmentsIntern(Set<DataSegment> segments)
{
Set<DataSegment> result = new HashSet<>();
for (DataSegment dataSegment : segments) {
result.add(DataSegment.intern(dataSegment, false));
}
return result;
}


private Set<DataSegment> filterInventory(final DruidServer container, Set<DataSegment> inventory)
{
if (container.segmentReplicatable()) {
inventory = dataSegmentsIntern(inventory);
}

Predicate<Pair<DruidServerMetadata, DataSegment>> predicate = Predicates.or(
defaultFilter,
Predicates.or(segmentPredicates.values())
Expand Down Expand Up @@ -135,7 +150,6 @@ public DataSegment apply(
protected DruidServer updateInnerInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory)
{
Set<DataSegment> filteredInventory = filterInventory(container, inventory);

Set<DataSegment> existing = zNodes.get(inventoryKey);
if (existing == null) {
throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE
{
try {
DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
return replaceWithExistingSegmentIfPresent(segment);
return DataSegment.intern(segment, true);
}
catch (IOException e) {
log.makeAlert(e, "Failed to read segment from db.").emit();
Expand Down Expand Up @@ -981,29 +981,6 @@ private static ImmutableMap<String, String> createDefaultDataSourceProperties()
return ImmutableMap.of("created", DateTimes.nowUtc().toString());
}

/**
* For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough
* (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all
* existing segments each time, and then replace them in {@link #dataSourcesSnapshot}. This method allows to use
* already existing (old) segments when possible, effectively interning them a-la {@link String#intern} or {@link
* com.google.common.collect.Interner}, aiming to make the majority of {@link DataSegment} objects garbage soon after
* they are deserialized and to die in young generation. It allows to avoid fragmentation of the old generation and
* full GCs.
*/
private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment)
{
@MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = this.dataSourcesSnapshot;
if (dataSourcesSnapshot == null) {
return segment;
}
@Nullable ImmutableDruidDataSource dataSource = dataSourcesSnapshot.getDataSource(segment.getDataSource());
if (dataSource == null) {
return segment;
}
DataSegment alreadyExistingSegment = dataSource.getSegment(segment.getId());
return alreadyExistingSegment != null ? alreadyExistingSegment : segment;
}

private String getSegmentsTable()
{
return dbTables.get().getSegmentsTable();
Expand Down