From 0e47c9e314e952b02245a5e1a6dd1bc6d9ffabe4 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 24 Mar 2017 02:04:30 -0700 Subject: [PATCH] Remove "granularity" from IngestSegmentFirehose. It wasn't doing anything useful (the sequences were being concatted, and cursor.getTime() wasn't being called) and it defaulted to Granularities.NONE. Changing it to Granularities.ALL gave me a 700x+ performance boost on a small dataset I was reindexing (2m27s to 365ms). Most of that was from avoiding making a lot of unnecessary column selectors. --- .../content/ingestion/update-existing-data.md | 1 - .../hadoop/DatasourceIngestionSpec.java | 20 ------------------- .../hadoop/DatasourceRecordReader.java | 4 +--- .../indexer/BatchDeltaIngestionTest.java | 3 +-- ...cUpdateDatasourcePathSpecSegmentsTest.java | 6 +----- .../hadoop/DatasourceIngestionSpecTest.java | 6 +----- .../hadoop/DatasourceRecordReaderTest.java | 1 - .../indexer/path/DatasourcePathSpecTest.java | 1 - .../IngestSegmentFirehoseFactory.java | 8 ++------ .../firehose/IngestSegmentFirehose.java | 7 +++---- .../firehose/IngestSegmentFirehoseTest.java | 3 +-- 11 files changed, 10 insertions(+), 50 deletions(-) diff --git a/docs/content/ingestion/update-existing-data.md b/docs/content/ingestion/update-existing-data.md index 92da8a0ae49e..0b33d02f02ba 100644 --- a/docs/content/ingestion/update-existing-data.md +++ b/docs/content/ingestion/update-existing-data.md @@ -52,7 +52,6 @@ Here is what goes inside `ingestionSpec`: |dataSource|String|Druid dataSource name from which you are loading the data.|yes| |intervals|List|A list of strings representing ISO-8601 Intervals.|yes| |segments|List|List of segments from which to read data from, by default it is obtained automatically. You can obtain list of segments to put here by making a POST query to coordinator at url /druid/coordinator/v1/metadata/datasources/segments?full with list of intervals specified in the request paylod e.g. ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]. You may want to provide this list manually in order to ensure that segments read are exactly same as they were at the time of task submission, task would fail if the list provided by the user does not match with state of database when the task actually runs.|no| -|granularity|String|Defines the granularity of the query while loading data. Default value is "none". See [Granularities](../querying/granularities.html).|no| |filter|JSON|See [Filters](../querying/filters.html)|no| |dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have an explicit list of dimensions then all the dimension columns present in stored data will be read.|no| |metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no| diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java index 66546330fa40..63d10450c2c4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import io.druid.common.utils.JodaUtils; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.granularity.Granularity; import io.druid.query.filter.DimFilter; import io.druid.timeline.DataSegment; @@ -38,7 +37,6 @@ public class DatasourceIngestionSpec private final List intervals; private final List segments; private final DimFilter filter; - private final Granularity granularity; private final List dimensions; private final List metrics; private final boolean ignoreWhenNoSegments; @@ -50,7 +48,6 @@ public DatasourceIngestionSpec( @JsonProperty("intervals") List intervals, @JsonProperty("segments") List segments, @JsonProperty("filter") DimFilter filter, - @JsonProperty("granularity") Granularity granularity, @JsonProperty("dimensions") List dimensions, @JsonProperty("metrics") List metrics, @JsonProperty("ignoreWhenNoSegments") boolean ignoreWhenNoSegments @@ -77,8 +74,6 @@ public DatasourceIngestionSpec( this.segments = segments; this.filter = filter; - this.granularity = granularity == null ? Granularities.NONE : granularity; - this.dimensions = dimensions; this.metrics = metrics; @@ -109,12 +104,6 @@ public DimFilter getFilter() return filter; } - @JsonProperty - public Granularity getGranularity() - { - return granularity; - } - @JsonProperty public List getDimensions() { @@ -141,7 +130,6 @@ public DatasourceIngestionSpec withDimensions(List dimensions) intervals, segments, filter, - granularity, dimensions, metrics, ignoreWhenNoSegments @@ -156,7 +144,6 @@ public DatasourceIngestionSpec withMetrics(List metrics) intervals, segments, filter, - granularity, dimensions, metrics, ignoreWhenNoSegments @@ -171,7 +158,6 @@ public DatasourceIngestionSpec withQueryGranularity(Granularity granularity) intervals, segments, filter, - granularity, dimensions, metrics, ignoreWhenNoSegments @@ -186,7 +172,6 @@ public DatasourceIngestionSpec withIgnoreWhenNoSegments(boolean ignoreWhenNoSegm intervals, segments, filter, - granularity, dimensions, metrics, ignoreWhenNoSegments @@ -220,9 +205,6 @@ public boolean equals(Object o) if (filter != null ? !filter.equals(that.filter) : that.filter != null) { return false; } - if (!granularity.equals(that.granularity)) { - return false; - } if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) { return false; } @@ -237,7 +219,6 @@ public int hashCode() result = 31 * result + intervals.hashCode(); result = 31 * result + (segments != null ? segments.hashCode() : 0); result = 31 * result + (filter != null ? filter.hashCode() : 0); - result = 31 * result + granularity.hashCode(); result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); result = 31 * result + (metrics != null ? metrics.hashCode() : 0); result = 31 * result + (ignoreWhenNoSegments ? 1 : 0); @@ -252,7 +233,6 @@ public String toString() ", intervals=" + intervals + ", segments=" + segments + ", filter=" + filter + - ", granularity=" + granularity + ", dimensions=" + dimensions + ", metrics=" + metrics + ", ignoreWhenNoSegments=" + ignoreWhenNoSegments + diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java index f18929f8902b..be6eb64a268a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java @@ -26,7 +26,6 @@ import com.google.common.collect.Lists; import com.google.common.io.Closeables; import com.google.common.io.Files; - import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedRow; @@ -111,8 +110,7 @@ public WindowedStorageAdapter apply(WindowedDataSegment segment) adapters, spec.getDimensions(), spec.getMetrics(), - spec.getFilter(), - spec.getGranularity() + spec.getFilter() ); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 00300c32ba41..7201bd119133 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -322,8 +322,7 @@ private void testIngestion( ImmutableList.of(new WindowedStorageAdapter(adapter, windowedDataSegment.getInterval())), ImmutableList.of("host"), ImmutableList.of("visited_sum", "unique_hosts"), - null, - Granularities.NONE + null ); List rows = Lists.newArrayList(); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java index 4ab0b9f30a3f..9154410875c0 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java @@ -92,7 +92,7 @@ public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePat PathSpec pathSpec = new DatasourcePathSpec( jsonMapper, null, - new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, null, false), + new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null, null, false), null ); HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( @@ -119,7 +119,6 @@ public void testupdateSegmentListIfDatasourcePathSpecWithMatchingUserSegments() null, null, null, - null, false ), null @@ -148,7 +147,6 @@ public void testupdateSegmentListThrowsExceptionWithUserSegmentsMismatch() throw null, null, null, - null, false ), null @@ -174,7 +172,6 @@ public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePat null, null, null, - null, false ), null @@ -206,7 +203,6 @@ public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithMultiplePathSpec( null, null, null, - null, false ), null diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java index 3c99203d9337..e55b6309a20c 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import io.druid.java.util.common.granularity.Granularities; import io.druid.query.filter.SelectorDimFilter; import io.druid.segment.TestHelper; import io.druid.timeline.DataSegment; @@ -49,7 +48,6 @@ public void testSingleIntervalSerde() throws Exception null, null, new SelectorDimFilter("dim", "value", null), - Granularities.DAY, Lists.newArrayList("d1", "d2"), Lists.newArrayList("m1", "m2", "m3"), false @@ -86,7 +84,6 @@ public void testMultiIntervalSerde() throws Exception null, null, null, - null, false ); @@ -133,7 +130,6 @@ public void testMultiIntervalSerde() throws Exception ) ), new SelectorDimFilter("dim", "value", null), - Granularities.DAY, Lists.newArrayList("d1", "d2"), Lists.newArrayList("m1", "m2", "m3"), true @@ -156,7 +152,7 @@ public void testOldJsonDeserialization() throws Exception DatasourceIngestionSpec actual = MAPPER.readValue(jsonStr, DatasourceIngestionSpec.class); Assert.assertEquals( - new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, null, false), + new DatasourceIngestionSpec("test", Interval.parse("2014/2015"), null, null, null, null, null, false), actual ); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java index c0ae5ecf69a7..b910e9b49b00 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java @@ -67,7 +67,6 @@ public void testSanity() throws Exception null, null, null, - null, segment.getDimensions(), segment.getMetrics(), false diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java index 1766c73b5d68..f1c7d558c909 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -79,7 +79,6 @@ public DatasourcePathSpecTest() null, null, null, - null, false ); diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 8897e47af3b9..25028f0f5c0f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -38,7 +38,6 @@ import io.druid.indexing.common.TaskToolboxFactory; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.task.NoopTask; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.parsers.ParseException; import io.druid.query.filter.DimFilter; import io.druid.segment.IndexIO; @@ -282,12 +281,9 @@ public WindowedStorageAdapter apply(final PartitionChunk input) ) ); - return new IngestSegmentFirehose(adapters, dims, metricsList, dimFilter, Granularities.NONE); + return new IngestSegmentFirehose(adapters, dims, metricsList, dimFilter); } - catch (IOException e) { - throw Throwables.propagate(e); - } - catch (SegmentLoadingException e) { + catch (IOException | SegmentLoadingException e) { throw Throwables.propagate(e); } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java index 7566b3b47342..fe975c5ed779 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java @@ -26,7 +26,7 @@ import io.druid.data.input.Firehose; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; -import io.druid.java.util.common.granularity.Granularity; +import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.Yielder; @@ -59,8 +59,7 @@ public IngestSegmentFirehose( final List adapters, final List dims, final List metrics, - final DimFilter dimFilter, - final Granularity granularity + final DimFilter dimFilter ) { Sequence rows = Sequences.concat( @@ -77,7 +76,7 @@ public Sequence apply(WindowedStorageAdapter adapter) Filters.toFilter(dimFilter), adapter.getInterval(), VirtualColumns.EMPTY, - granularity, + Granularities.ALL, false ), new Function>() { diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index 6bdaf2794079..ea8b5590701a 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -72,8 +72,7 @@ public void testSanity() throws Exception ImmutableList.of(wsa, wsa), ImmutableList.of("host"), ImmutableList.of("visited_sum", "unique_hosts"), - null, - Granularities.NONE + null ); int count = 0;