diff --git a/core/src/main/java/org/apache/druid/timeline/DataSegment.java b/core/src/main/java/org/apache/druid/timeline/DataSegment.java index c427f6a04ea6..fc092a5c25e7 100644 --- a/core/src/main/java/org/apache/druid/timeline/DataSegment.java +++ b/core/src/main/java/org/apache/druid/timeline/DataSegment.java @@ -76,9 +76,22 @@ public static class PruneLoadSpecHolder @Inject(optional = true) @PruneLoadSpec boolean pruneLoadSpec = false; } + public static DataSegment intern(DataSegment dataSegment, boolean updateLoadSpec) + { + DataSegment result = DATA_SEGMENT_INTERNER.intern(dataSegment); + if (updateLoadSpec) { + result.dimensions = dataSegment.dimensions; + result.metrics = dataSegment.metrics; + result.loadSpec = dataSegment.loadSpec; + } + return result; + } + + private static final Interner STRING_INTERNER = Interners.newWeakInterner(); private static final Interner> DIMENSIONS_INTERNER = Interners.newWeakInterner(); private static final Interner> METRICS_INTERNER = Interners.newWeakInterner(); + private static final Interner DATA_SEGMENT_INTERNER = Interners.newWeakInterner(); private static final Map PRUNED_LOAD_SPEC = ImmutableMap.of( "load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space", "" @@ -87,9 +100,9 @@ public static class PruneLoadSpecHolder private final Integer binaryVersion; private final SegmentId id; @Nullable - private final Map loadSpec; - private final List dimensions; - private final List metrics; + private volatile Map loadSpec; + private volatile List dimensions; + private volatile List metrics; private final ShardSpec shardSpec; private final long size; diff --git a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java index 092925667625..b550a526c1f9 100644 --- a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java @@ -37,6 +37,7 @@ import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.timeline.DataSegment; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -94,8 +95,22 @@ protected DruidServer addInnerInventory( return container; } + private Set dataSegmentsIntern(Set segments) + { + Set result = new HashSet<>(); + for (DataSegment dataSegment : segments) { + result.add(DataSegment.intern(dataSegment, false)); + } + return result; + } + + private Set filterInventory(final DruidServer container, Set inventory) { + if (container.segmentReplicatable()) { + inventory = dataSegmentsIntern(inventory); + } + Predicate> predicate = Predicates.or( defaultFilter, Predicates.or(segmentPredicates.values()) @@ -135,7 +150,6 @@ public DataSegment apply( protected DruidServer updateInnerInventory(DruidServer container, String inventoryKey, Set inventory) { Set filteredInventory = filterInventory(container, inventory); - Set existing = zNodes.get(inventoryKey); if (existing == null) { throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index 4f22478cd36a..b24d5027f99d 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -934,7 +934,7 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE { try { DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class); - return replaceWithExistingSegmentIfPresent(segment); + return DataSegment.intern(segment, true); } catch (IOException e) { log.makeAlert(e, "Failed to read segment from db.").emit(); @@ -981,29 +981,6 @@ private static ImmutableMap createDefaultDataSourceProperties() return ImmutableMap.of("created", DateTimes.nowUtc().toString()); } - /** - * For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough - * (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all - * existing segments each time, and then replace them in {@link #dataSourcesSnapshot}. This method allows to use - * already existing (old) segments when possible, effectively interning them a-la {@link String#intern} or {@link - * com.google.common.collect.Interner}, aiming to make the majority of {@link DataSegment} objects garbage soon after - * they are deserialized and to die in young generation. It allows to avoid fragmentation of the old generation and - * full GCs. - */ - private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment) - { - @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = this.dataSourcesSnapshot; - if (dataSourcesSnapshot == null) { - return segment; - } - @Nullable ImmutableDruidDataSource dataSource = dataSourcesSnapshot.getDataSource(segment.getDataSource()); - if (dataSource == null) { - return segment; - } - DataSegment alreadyExistingSegment = dataSource.getSegment(segment.getId()); - return alreadyExistingSegment != null ? alreadyExistingSegment : segment; - } - private String getSegmentsTable() { return dbTables.get().getSegmentsTable();