From 2ddcccc0845017b6c360c5131b24323fac4e366b Mon Sep 17 00:00:00 2001 From: Nishant Date: Mon, 28 Mar 2016 15:30:29 -0700 Subject: [PATCH] Help relieve some memory pressure on brokers & coordinator Help relieve some memory pressure on brokers & master when it has to manage large number of segments. fix tests review comments --- .../java/io/druid/timeline/DataSegment.java | 28 +++++++++---------- .../timeline/partition/NoneShardSpec.java | 6 ++++ .../timeline/partition/NoneShardSpecTest.java | 16 +++++++++++ .../indexer/BatchDeltaIngestionTest.java | 4 +-- .../druid/indexer/IndexGeneratorJobTest.java | 6 ++-- 5 files changed, 40 insertions(+), 20 deletions(-) diff --git a/api/src/main/java/io/druid/timeline/DataSegment.java b/api/src/main/java/io/druid/timeline/DataSegment.java index ec9f39bc81f1..5db899f0a1b1 100644 --- a/api/src/main/java/io/druid/timeline/DataSegment.java +++ b/api/src/main/java/io/druid/timeline/DataSegment.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; @@ -31,6 +30,8 @@ import com.google.common.collect.Interner; import com.google.common.collect.Interners; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; import com.metamx.common.Granularity; import io.druid.jackson.CommaListJoinDeserializer; import io.druid.jackson.CommaListJoinSerializer; @@ -40,6 +41,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -50,15 +52,8 @@ public class DataSegment implements Comparable { public static String delimiter = "_"; private final Integer binaryVersion; - private static final Interner interner = Interners.newWeakInterner(); - private static final Function internFun = new Function() - { - @Override - public String apply(String input) - { - return interner.intern(input); - } - }; + private static final Interner STRING_INTERNER = Interners.newWeakInterner(); + private static final Interner> LIST_INTERNER = Interners.newWeakInterner(); public static String makeDataSegmentIdentifier( String dataSource, @@ -116,17 +111,19 @@ public boolean apply(String input) }; // dataSource, dimensions & metrics are stored as canonical string values to decrease memory required for storing large numbers of segments. - this.dataSource = interner.intern(dataSource); + this.dataSource = STRING_INTERNER.intern(dataSource); this.interval = interval; this.loadSpec = loadSpec; this.version = version; + this.dimensions = dimensions == null ? ImmutableList.of() - : ImmutableList.copyOf(Iterables.transform(Iterables.filter(dimensions, nonEmpty), internFun)); + : LIST_INTERNER.intern(Ordering.natural() + .immutableSortedCopy(Iterables.filter(dimensions, nonEmpty))); this.metrics = metrics == null ? ImmutableList.of() - : ImmutableList.copyOf(Iterables.transform(Iterables.filter(metrics, nonEmpty), internFun)); - this.shardSpec = (shardSpec == null) ? new NoneShardSpec() : shardSpec; + : LIST_INTERNER.intern(Ordering.natural().immutableSortedCopy(Iterables.filter(metrics, nonEmpty))); + this.shardSpec = (shardSpec == null) ? NoneShardSpec.INSTANCE : shardSpec; this.binaryVersion = binaryVersion; this.size = size; @@ -139,6 +136,7 @@ public boolean apply(String input) ); } + /** * Get dataSource * @@ -323,7 +321,7 @@ public Builder() this.loadSpec = ImmutableMap.of(); this.dimensions = ImmutableList.of(); this.metrics = ImmutableList.of(); - this.shardSpec = new NoneShardSpec(); + this.shardSpec = NoneShardSpec.INSTANCE; this.size = -1; } diff --git a/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java b/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java index e1b7eb429179..33863f2ca18b 100644 --- a/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java +++ b/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java @@ -19,6 +19,7 @@ package io.druid.timeline.partition; +import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.metamx.common.ISE; @@ -32,6 +33,11 @@ */ public class NoneShardSpec implements ShardSpec { + public final static NoneShardSpec INSTANCE = new NoneShardSpec(); + + @JsonCreator + public static NoneShardSpec instance() { return INSTANCE; } + @Override public PartitionChunk createChunk(T obj) { diff --git a/api/src/test/java/io/druid/timeline/partition/NoneShardSpecTest.java b/api/src/test/java/io/druid/timeline/partition/NoneShardSpecTest.java index 887b1f593c1d..09a91cb0dcbd 100644 --- a/api/src/test/java/io/druid/timeline/partition/NoneShardSpecTest.java +++ b/api/src/test/java/io/druid/timeline/partition/NoneShardSpecTest.java @@ -1,5 +1,8 @@ package io.druid.timeline.partition; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.TestObjectMapper; import org.junit.Assert; import org.junit.Test; @@ -13,4 +16,17 @@ public void testEqualsAndHashCode() Assert.assertEquals(one, two); Assert.assertEquals(one.hashCode(), two.hashCode()); } + + @Test + public void testSerde() throws Exception + { + final NoneShardSpec one = NoneShardSpec.INSTANCE; + ObjectMapper mapper = new TestObjectMapper(); + NoneShardSpec serde1 = mapper.readValue(mapper.writeValueAsString(one), NoneShardSpec.class); + NoneShardSpec serde2 = mapper.readValue(mapper.writeValueAsString(one), NoneShardSpec.class); + + // Serde should return same object instead of creating new one every time. + Assert.assertTrue(serde1 == serde2); + Assert.assertTrue(one == serde1); + } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 36be2c0289e6..eb64e1d3bdbc 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -305,8 +305,8 @@ private void testIngestion( Assert.assertEquals("local", dataSegment.getLoadSpec().get("type")); Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path")); Assert.assertEquals("host", dataSegment.getDimensions().get(0)); - Assert.assertEquals("visited_sum", dataSegment.getMetrics().get(0)); - Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1)); + Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(0)); + Assert.assertEquals("visited_sum", dataSegment.getMetrics().get(1)); Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion()); HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec(); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index f7e9872cf753..6602fe0d20dc 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -597,11 +597,11 @@ private void verifyJob(IndexGeneratorJob job) throws IOException if (datasourceName.equals("website")) { Assert.assertEquals("website", dataSegment.getDataSource()); Assert.assertEquals("host", dataSegment.getDimensions().get(0)); - Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0)); - Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1)); + Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(0)); + Assert.assertEquals("visited_num", dataSegment.getMetrics().get(1)); } else if (datasourceName.equals("inherit_dims")) { Assert.assertEquals("inherit_dims", dataSegment.getDataSource()); - Assert.assertEquals(ImmutableList.of("X", "Y", "M", "Q", "B", "F"), dataSegment.getDimensions()); + Assert.assertEquals(ImmutableList.of("B", "F", "M", "Q", "X", "Y"), dataSegment.getDimensions()); Assert.assertEquals("count", dataSegment.getMetrics().get(0)); } else if (datasourceName.equals("inherit_dims2")) { Assert.assertEquals("inherit_dims2", dataSegment.getDataSource());