From 155d2421109098b8de9fa11b9902713cf93572bc Mon Sep 17 00:00:00 2001 From: dengfangyuan Date: Fri, 26 Jul 2019 17:50:44 +0800 Subject: [PATCH 1/2] DataSegment intern improvement --- .../apache/druid/timeline/DataSegment.java | 19 ++++++++++++++++--- .../client/BatchServerInventoryView.java | 16 +++++++++++++++- .../metadata/SQLMetadataSegmentManager.java | 2 +- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/druid/timeline/DataSegment.java b/core/src/main/java/org/apache/druid/timeline/DataSegment.java index fb6a287fc3a5..2060472a39f9 100644 --- a/core/src/main/java/org/apache/druid/timeline/DataSegment.java +++ b/core/src/main/java/org/apache/druid/timeline/DataSegment.java @@ -76,9 +76,22 @@ public static class PruneLoadSpecHolder @Inject(optional = true) @PruneLoadSpec boolean pruneLoadSpec = false; } + public static DataSegment intern(DataSegment dataSegment, boolean updateLoadSpec) + { + DataSegment result = DATA_SEGMENT_INTERNER.intern(dataSegment); + if (updateLoadSpec) { + result.dimensions = dataSegment.dimensions; + result.metrics = dataSegment.metrics; + result.loadSpec = dataSegment.loadSpec; + } + return result; + } + + private static final Interner STRING_INTERNER = Interners.newWeakInterner(); private static final Interner> DIMENSIONS_INTERNER = Interners.newWeakInterner(); private static final Interner> METRICS_INTERNER = Interners.newWeakInterner(); + private static final Interner DATA_SEGMENT_INTERNER = Interners.newWeakInterner(); private static final Map PRUNED_LOAD_SPEC = ImmutableMap.of( "load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space", "" @@ -87,9 +100,9 @@ public static class PruneLoadSpecHolder private final Integer binaryVersion; private final SegmentId id; @Nullable - private final Map loadSpec; - private final List dimensions; - private final List metrics; + private volatile Map loadSpec; + private volatile List dimensions; + private volatile List metrics; private final ShardSpec shardSpec; private final long size; diff --git a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java index 092925667625..b550a526c1f9 100644 --- a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java @@ -37,6 +37,7 @@ import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.timeline.DataSegment; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -94,8 +95,22 @@ protected DruidServer addInnerInventory( return container; } + private Set dataSegmentsIntern(Set segments) + { + Set result = new HashSet<>(); + for (DataSegment dataSegment : segments) { + result.add(DataSegment.intern(dataSegment, false)); + } + return result; + } + + private Set filterInventory(final DruidServer container, Set inventory) { + if (container.segmentReplicatable()) { + inventory = dataSegmentsIntern(inventory); + } + Predicate> predicate = Predicates.or( defaultFilter, Predicates.or(segmentPredicates.values()) @@ -135,7 +150,6 @@ public DataSegment apply( protected DruidServer updateInnerInventory(DruidServer container, String inventoryKey, Set inventory) { Set filteredInventory = filterInventory(container, inventory); - Set existing = zNodes.get(inventoryKey); if (existing == null) { throw new ISE("Trying to update an inventoryKey[%s] that didn't exist?!", inventoryKey); diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index bb8514291df1..d50b30f84187 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -677,7 +677,7 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE { try { DataSegment segment = jsonMapper.readValue(r.getBytes("payload"), DataSegment.class); - return replaceWithExistingSegmentIfPresent(segment); + return DataSegment.intern(segment, true); } catch (IOException e) { log.makeAlert(e, "Failed to read segment from db.").emit(); From c5a1eed792e0d3725f3c64ca8c58983524a55128 Mon Sep 17 00:00:00 2001 From: dengfangyuan Date: Fri, 26 Jul 2019 18:32:11 +0800 Subject: [PATCH 2/2] delete unused replaceWithExistingSegmentIfPresent --- .../metadata/SQLMetadataSegmentManager.java | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java index d50b30f84187..032097b96222 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java @@ -726,27 +726,6 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx) throws SQLE dataSourcesSnapshot = new DataSourcesSnapshot(updatedDataSources); } - /** - * For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough - * (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all - * existing segments each time, and then replace them in {@link #dataSourcesSnapshot}. This method allows to use already - * existing (old) segments when possible, effectively interning them a-la {@link String#intern} or {@link - * com.google.common.collect.Interner}, aiming to make the majority of {@link DataSegment} objects garbage soon after - * they are deserialized and to die in young generation. It allows to avoid fragmentation of the old generation and - * full GCs. - */ - private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment) - { - ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot) - .map(m -> m.getDataSourcesMap().get(segment.getDataSource())) - .orElse(null); - if (dataSource == null) { - return segment; - } - DataSegment alreadyExistingSegment = dataSource.getSegment(segment.getId()); - return alreadyExistingSegment != null ? alreadyExistingSegment : segment; - } - private String getSegmentsTable() { return dbTables.get().getSegmentsTable();