From 4d4aa8bfc6cc72eaf45bdc87c7cdb2c4c20decda Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 18 May 2015 13:42:30 -0500 Subject: [PATCH 1/7] refactor IngestSegmentFirehoseFactory so that IngestSegmentFirehose becomes reusable Conflicts: indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java --- .../IngestSegmentFirehoseFactory.java | 172 +-------------- .../IngestSegmentFirehoseFactoryTest.java | 5 +- .../firehose/IngestSegmentFirehose.java | 202 ++++++++++++++++++ 3 files changed, 207 insertions(+), 172 deletions(-) create mode 100644 server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java diff --git a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java index 9d510c14300f..d620d90dc855 100644 --- a/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java @@ -25,20 +25,13 @@ import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.inject.Injector; -import com.metamx.common.guava.Sequence; -import com.metamx.common.guava.Sequences; -import com.metamx.common.guava.Yielder; -import com.metamx.common.guava.YieldingAccumulator; import com.metamx.common.parsers.ParseException; import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.InputRow; -import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; import io.druid.indexing.common.TaskToolbox; @@ -46,30 +39,19 @@ import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.task.NoopTask; import io.druid.query.filter.DimFilter; -import io.druid.query.select.EventHolder; -import io.druid.segment.Cursor; -import io.druid.segment.DimensionSelector; import io.druid.segment.IndexIO; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.ObjectColumnSelector; import io.druid.segment.QueryableIndexStorageAdapter; import io.druid.segment.StorageAdapter; -import io.druid.segment.column.Column; -import io.druid.segment.data.IndexedInts; -import io.druid.segment.filter.Filters; import io.druid.segment.loading.SegmentLoadingException; +import io.druid.segment.realtime.firehose.IngestSegmentFirehose; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; -import io.druid.utils.Runnables; -import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -286,7 +268,7 @@ public StorageAdapter apply(PartitionChunk input) ) ); - return new IngestSegmentFirehose(adapters, dims, metricsList); + return new IngestSegmentFirehose(adapters, dims, metricsList, dimFilter, interval, QueryGranularity.NONE); } catch (IOException e) { @@ -297,154 +279,4 @@ public StorageAdapter apply(PartitionChunk input) } } - - public class IngestSegmentFirehose implements Firehose - { - private volatile Yielder rowYielder; - - public IngestSegmentFirehose(List adapters, final List dims, final List metrics) - { - Sequence rows = Sequences.concat( - Iterables.transform( - adapters, new Function>() - { - @Nullable - @Override - public Sequence apply(StorageAdapter adapter) - { - return Sequences.concat( - Sequences.map( - adapter.makeCursors( - Filters.convertDimensionFilters(dimFilter), - interval, - QueryGranularity.ALL - ), new Function>() - { - @Nullable - @Override - public Sequence apply(final Cursor cursor) - { - final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); - - final Map dimSelectors = Maps.newHashMap(); - for (String dim : dims) { - final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null); - // dimSelector is null if the dimension is not present - if (dimSelector != null) { - dimSelectors.put(dim, dimSelector); - } - } - - final Map metSelectors = Maps.newHashMap(); - for (String metric : metrics) { - final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); - if (metricSelector != null) { - metSelectors.put(metric, metricSelector); - } - } - - return Sequences.simple( - new Iterable() - { - @Override - public Iterator iterator() - { - return new Iterator() - { - @Override - public boolean hasNext() - { - return !cursor.isDone(); - } - - @Override - public InputRow next() - { - final Map theEvent = Maps.newLinkedHashMap(); - final long timestamp = timestampColumnSelector.get(); - theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); - - for (Map.Entry dimSelector : dimSelectors.entrySet()) { - final String dim = dimSelector.getKey(); - final DimensionSelector selector = dimSelector.getValue(); - final IndexedInts vals = selector.getRow(); - - if (vals.size() == 1) { - final String dimVal = selector.lookupName(vals.get(0)); - theEvent.put(dim, dimVal); - } else { - List dimVals = Lists.newArrayList(); - for (int i = 0; i < vals.size(); ++i) { - dimVals.add(selector.lookupName(vals.get(i))); - } - theEvent.put(dim, dimVals); - } - } - - for (Map.Entry metSelector : metSelectors.entrySet()) { - final String metric = metSelector.getKey(); - final ObjectColumnSelector selector = metSelector.getValue(); - theEvent.put(metric, selector.get()); - } - cursor.advance(); - return new MapBasedInputRow(timestamp, dims, theEvent); - } - - @Override - public void remove() - { - throw new UnsupportedOperationException("Remove Not Supported"); - } - }; - } - } - ); - } - } - ) - ); - } - } - ) - ); - rowYielder = rows.toYielder( - null, - new YieldingAccumulator() - { - @Override - public InputRow accumulate(InputRow accumulated, InputRow in) - { - yield(); - return in; - } - } - ); - } - - @Override - public boolean hasMore() - { - return !rowYielder.isDone(); - } - - @Override - public InputRow nextRow() - { - final InputRow inputRow = rowYielder.get(); - rowYielder = rowYielder.next(null); - return inputRow; - } - - @Override - public Runnable commit() - { - return Runnables.getNoopRunnable(); - } - - @Override - public void close() throws IOException - { - rowYielder.close(); - } - } } diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 7bb22af7fc34..c632c84ad6d6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -74,6 +74,7 @@ import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.firehose.IngestSegmentFirehose; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Interval; @@ -461,8 +462,8 @@ public void simpleFirehoseReadingTest() throws IOException { Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size()); Integer rowcount = 0; - try (final IngestSegmentFirehoseFactory.IngestSegmentFirehose firehose = - (IngestSegmentFirehoseFactory.IngestSegmentFirehose) + try (final IngestSegmentFirehose firehose = + (IngestSegmentFirehose) factory.connect(rowParser)) { while (firehose.hasMore()) { InputRow row = firehose.nextRow(); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java new file mode 100644 index 000000000000..bc7da0bfcf23 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/firehose/IngestSegmentFirehose.java @@ -0,0 +1,202 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.segment.realtime.firehose; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import io.druid.data.input.Firehose; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.granularity.QueryGranularity; +import io.druid.query.filter.DimFilter; +import io.druid.query.select.EventHolder; +import io.druid.segment.Cursor; +import io.druid.segment.DimensionSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.StorageAdapter; +import io.druid.segment.column.Column; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.filter.Filters; +import io.druid.utils.Runnables; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class IngestSegmentFirehose implements Firehose +{ + private volatile Yielder rowYielder; + + public IngestSegmentFirehose(List adapters, final List dims, final List metrics, final DimFilter dimFilter, final Interval interval, final QueryGranularity granularity) + { + Sequence rows = Sequences.concat( + Iterables.transform( + adapters, new Function>() + { + @Nullable + @Override + public Sequence apply(StorageAdapter adapter) + { + return Sequences.concat( + Sequences.map( + adapter.makeCursors( + Filters.convertDimensionFilters(dimFilter), + interval, + granularity + ), new Function>() + { + @Nullable + @Override + public Sequence apply(final Cursor cursor) + { + final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); + + final Map dimSelectors = Maps.newHashMap(); + for (String dim : dims) { + final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim, null); + // dimSelector is null if the dimension is not present + if (dimSelector != null) { + dimSelectors.put(dim, dimSelector); + } + } + + final Map metSelectors = Maps.newHashMap(); + for (String metric : metrics) { + final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric); + if (metricSelector != null) { + metSelectors.put(metric, metricSelector); + } + } + + return Sequences.simple( + new Iterable() + { + @Override + public Iterator iterator() + { + return new Iterator() + { + @Override + public boolean hasNext() + { + return !cursor.isDone(); + } + + @Override + public InputRow next() + { + final Map theEvent = Maps.newLinkedHashMap(); + final long timestamp = timestampColumnSelector.get(); + theEvent.put(EventHolder.timestampKey, new DateTime(timestamp)); + + for (Map.Entry dimSelector : dimSelectors.entrySet()) { + final String dim = dimSelector.getKey(); + final DimensionSelector selector = dimSelector.getValue(); + final IndexedInts vals = selector.getRow(); + + if (vals.size() == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + theEvent.put(dim, dimVal); + } else { + List dimVals = Lists.newArrayList(); + for (int i = 0; i < vals.size(); ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + theEvent.put(dim, dimVals); + } + } + + for (Map.Entry metSelector : metSelectors.entrySet()) { + final String metric = metSelector.getKey(); + final ObjectColumnSelector selector = metSelector.getValue(); + theEvent.put(metric, selector.get()); + } + cursor.advance(); + return new MapBasedInputRow(timestamp, dims, theEvent); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException("Remove Not Supported"); + } + }; + } + } + ); + } + } + ) + ); + } + } + ) + ); + rowYielder = rows.toYielder( + null, + new YieldingAccumulator() + { + @Override + public InputRow accumulate(InputRow accumulated, InputRow in) + { + yield(); + return in; + } + } + ); + } + + @Override + public boolean hasMore() + { + return !rowYielder.isDone(); + } + + @Override + public InputRow nextRow() + { + final InputRow inputRow = rowYielder.get(); + rowYielder = rowYielder.next(null); + return inputRow; + } + + @Override + public Runnable commit() + { + return Runnables.getNoopRunnable(); + } + + @Override + public void close() throws IOException + { + rowYielder.close(); + } +} From f1d309a67187f270f663937a2e61945ce88e41fa Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 17 Jul 2015 17:20:16 -0500 Subject: [PATCH 2/7] do not run parser if value from InputFormat is already an InputRow --- .../main/java/io/druid/indexer/HadoopDruidIndexerMapper.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index e90a0c590187..803e527b4dd9 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -103,7 +103,9 @@ public final static InputRow parseInputRow(Object value, InputRowParser parser) { if(parser instanceof StringInputRowParser && value instanceof Text) { //Note: This is to ensure backward compatibility with 0.7.0 and before - return ((StringInputRowParser)parser).parse(value.toString()); + return ((StringInputRowParser) parser).parse(value.toString()); + } else if(value instanceof InputRow) { + return (InputRow)value; } else { return parser.parse(value); } From 1ae56f139bd79b378c779ebf47154a0ed346fae5 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 25 Jun 2015 16:10:28 -0500 Subject: [PATCH 3/7] Druid Hadoop InputFormat and pathSpec Conflicts: indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java indexing-service/pom.xml --- .../io/druid/indexer/IndexGeneratorJob.java | 16 +- .../main/java/io/druid/indexer/JobHelper.java | 36 +++ .../hadoop/DatasourceIngestionSpec.java | 160 ++++++++++++ .../indexer/hadoop/DatasourceInputFormat.java | 121 +++++++++ .../indexer/hadoop/DatasourceInputSplit.java | 88 +++++++ .../hadoop/DatasourceRecordReader.java | 194 +++++++++++++++ .../druid/indexer/hadoop/SegmentInputRow.java | 97 ++++++++ .../indexer/path/DatasourcePathSpec.java | 184 ++++++++++++++ .../java/io/druid/indexer/path/PathSpec.java | 3 +- .../indexer/updater/HadoopConverterJob.java | 93 +------ .../hadoop/DatasourceIngestionSpecTest.java | 52 ++++ .../hadoop/DatasourceInputFormatTest.java | 154 ++++++++++++ .../hadoop/DatasourceInputSplitTest.java | 71 ++++++ .../hadoop/DatasourceRecordReaderTest.java | 136 +++++++++++ .../indexer/path/DatasourcePathSpecTest.java | 229 ++++++++++++++++++ .../resources/test-segment/descriptor.json | 17 ++ .../src/test/resources/test-segment/index.zip | Bin 0 -> 1019 bytes .../src/test/resources/test-segment/note | 5 + 18 files changed, 1567 insertions(+), 89 deletions(-) create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java create mode 100644 indexing-hadoop/src/test/resources/test-segment/descriptor.json create mode 100644 indexing-hadoop/src/test/resources/test-segment/index.zip create mode 100644 indexing-hadoop/src/test/resources/test-segment/note diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 562302518280..08d852046fff 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -35,6 +35,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.data.input.Rows; +import io.druid.indexer.hadoop.SegmentInputRow; import io.druid.offheap.OffheapBufferPool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.IndexIO; @@ -235,6 +236,7 @@ public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper loadSpec = dataSegment.getLoadSpec(); + final String type = loadSpec.get("type").toString(); + final URI segmentLocURI; + if ("s3_zip".equals(type)) { + segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); + } else if ("hdfs".equals(type)) { + segmentLocURI = URI.create(loadSpec.get("path").toString()); + } else if ("local".equals(type)) { + try { + segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null); + } + catch (URISyntaxException e) { + throw new ISE(e, "Unable to form simple file uri"); + } + } else { + try { + throw new IAE( + "Cannot figure out loadSpec %s", + HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec) + ); + } + catch (JsonProcessingException e) { + throw new ISE("Cannot write Map with json mapper"); + } + } + return segmentLocURI; + } + public static ProgressIndicator progressIndicatorForContext( final TaskAttemptContext context ) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java new file mode 100644 index 000000000000..4bff88c3e798 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceIngestionSpec.java @@ -0,0 +1,160 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import io.druid.granularity.QueryGranularity; +import io.druid.query.filter.DimFilter; +import org.joda.time.Interval; + +import java.util.List; + +public class DatasourceIngestionSpec +{ + private final String dataSource; + private final Interval interval; + private final DimFilter filter; + private final QueryGranularity granularity; + private final List dimensions; + private final List metrics; + + @JsonCreator + public DatasourceIngestionSpec( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval, + @JsonProperty("filter") DimFilter filter, + @JsonProperty("granularity") QueryGranularity granularity, + @JsonProperty("dimensions") List dimensions, + @JsonProperty("metrics") List metrics + ) + { + this.dataSource = Preconditions.checkNotNull(dataSource, "null dataSource"); + this.interval = Preconditions.checkNotNull(interval, "null interval"); + this.filter = filter; + this.granularity = granularity == null ? QueryGranularity.NONE : granularity; + + this.dimensions = dimensions; + this.metrics = metrics; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @JsonProperty + public DimFilter getFilter() + { + return filter; + } + + @JsonProperty + public QueryGranularity getGranularity() + { + return granularity; + } + + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @JsonProperty + public List getMetrics() + { + return metrics; + } + + public DatasourceIngestionSpec withDimensions(List dimensions) + { + return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics); + } + + public DatasourceIngestionSpec withMetrics(List metrics) + { + return new DatasourceIngestionSpec(dataSource, interval, filter, granularity, dimensions, metrics); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DatasourceIngestionSpec that = (DatasourceIngestionSpec) o; + + if (!dataSource.equals(that.dataSource)) { + return false; + } + if (!interval.equals(that.interval)) { + return false; + } + if (filter != null ? !filter.equals(that.filter) : that.filter != null) { + return false; + } + if (!granularity.equals(that.granularity)) { + return false; + } + if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) { + return false; + } + return !(metrics != null ? !metrics.equals(that.metrics) : that.metrics != null); + + } + + @Override + public int hashCode() + { + int result = dataSource.hashCode(); + result = 31 * result + interval.hashCode(); + result = 31 * result + (filter != null ? filter.hashCode() : 0); + result = 31 * result + granularity.hashCode(); + result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); + result = 31 * result + (metrics != null ? metrics.hashCode() : 0); + return result; + } + + @Override + public String toString() + { + return "DatasourceIngestionSpec{" + + "dataSource='" + dataSource + '\'' + + ", interval=" + interval + + ", filter=" + filter + + ", granularity=" + granularity + + ", dimensions=" + dimensions + + ", metrics=" + metrics + + '}'; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java new file mode 100644 index 000000000000..3677612bc3aa --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java @@ -0,0 +1,121 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.data.input.InputRow; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +public class DatasourceInputFormat extends InputFormat +{ + private static final Logger logger = new Logger(DatasourceInputFormat.class); + + public static final String CONF_INPUT_SEGMENTS = "druid.segments"; + public static final String CONF_DRUID_SCHEMA = "druid.datasource.schema"; + public static final String CONF_MAX_SPLIT_SIZE = "druid.datasource.split.max.size"; + + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException + { + Configuration conf = context.getConfiguration(); + + String segmentsStr = Preconditions.checkNotNull(conf.get(CONF_INPUT_SEGMENTS), "No segments found to read"); + List segments = HadoopDruidIndexerConfig.jsonMapper.readValue( + segmentsStr, + new TypeReference>() + { + } + ); + if (segments == null || segments.size() == 0) { + throw new ISE("No segments found to read"); + } + + logger.info("segments to read [%s]", segmentsStr); + + long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0); + + if (maxSize > 0) { + //combining is to happen, let us sort the segments list by size so that they + //are combined appropriately + Collections.sort( + segments, + new Comparator() + { + @Override + public int compare(DataSegment s1, DataSegment s2) + { + return Long.compare(s1.getSize(), s2.getSize()); + } + } + ); + } + + List splits = Lists.newArrayList(); + + List list = new ArrayList<>(); + long size = 0; + + for (DataSegment segment : segments) { + if (size + segment.getSize() > maxSize && size > 0) { + splits.add(new DatasourceInputSplit(list)); + list = Lists.newArrayList(); + size = 0; + } + + list.add(segment); + size += segment.getSize(); + } + + if (list.size() > 0) { + splits.add(new DatasourceInputSplit(list)); + } + + logger.info("Number of splits [%d]", splits.size()); + return splits; + } + + @Override + public RecordReader createRecordReader( + InputSplit split, + TaskAttemptContext context + ) throws IOException, InterruptedException + { + return new DatasourceRecordReader(); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java new file mode 100644 index 000000000000..6b80159ffb38 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java @@ -0,0 +1,88 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Preconditions; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +import javax.validation.constraints.NotNull; + +public class DatasourceInputSplit extends InputSplit implements Writable +{ + private List segments = null; + + //required for deserialization + public DatasourceInputSplit() + { + } + + public DatasourceInputSplit(@NotNull List segments) + { + Preconditions.checkArgument(segments != null && segments.size() > 0, "no segments"); + this.segments = segments; + } + + @Override + public long getLength() throws IOException, InterruptedException + { + long size = 0; + for (DataSegment segment : segments) { + size += segment.getSize(); + } + return size; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException + { + return new String[]{}; + } + + public List getSegments() + { + return segments; + } + + @Override + public void write(DataOutput out) throws IOException + { + out.writeUTF(HadoopDruidIndexerConfig.jsonMapper.writeValueAsString(segments)); + } + + @Override + public void readFields(DataInput in) throws IOException + { + segments = HadoopDruidIndexerConfig.jsonMapper.readValue( + in.readUTF(), + new TypeReference>() + { + } + ); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java new file mode 100644 index 000000000000..ef20db3ed520 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java @@ -0,0 +1,194 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; +import com.google.common.io.Files; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.MapBasedRow; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.JobHelper; +import io.druid.segment.IndexIO; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexStorageAdapter; +import io.druid.segment.StorageAdapter; +import io.druid.segment.realtime.firehose.IngestSegmentFirehose; +import io.druid.timeline.DataSegment; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +public class DatasourceRecordReader extends RecordReader +{ + private static final Logger logger = new Logger(DatasourceRecordReader.class); + + private DatasourceIngestionSpec spec; + private IngestSegmentFirehose firehose; + + private int rowNum; + private MapBasedRow currRow; + + private List indexes = Lists.newArrayList(); + private List tmpSegmentDirs = Lists.newArrayList(); + private int numRows; + + @Override + public void initialize(InputSplit split, final TaskAttemptContext context) throws IOException, InterruptedException + { + spec = readAndVerifyDatasourceIngestionSpec(context.getConfiguration(), HadoopDruidIndexerConfig.jsonMapper); + + List segments = ((DatasourceInputSplit) split).getSegments(); + + List adapters = Lists.transform( + segments, + new Function() + { + @Override + public StorageAdapter apply(DataSegment segment) + { + try { + logger.info("Getting storage path for segment [%s]", segment.getIdentifier()); + Path path = new Path(JobHelper.getURIFromSegment(segment)); + + logger.info("Fetch segment files from [%s]", path); + + File dir = Files.createTempDir(); + tmpSegmentDirs.add(dir); + logger.info("Locally storing fetched segment at [%s]", dir); + + JobHelper.unzipNoGuava(path, context.getConfiguration(), dir, context); + logger.info("finished fetching segment files"); + + QueryableIndex index = IndexIO.loadIndex(dir); + indexes.add(index); + numRows += index.getNumRows(); + + return new QueryableIndexStorageAdapter(index); + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + } + } + ); + + firehose = new IngestSegmentFirehose( + adapters, + spec.getDimensions(), + spec.getMetrics(), + spec.getFilter(), + spec.getInterval(), + spec.getGranularity() + ); + + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException + { + if (firehose.hasMore()) { + currRow = (MapBasedRow) firehose.nextRow(); + rowNum++; + return true; + } else { + return false; + } + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException + { + return NullWritable.get(); + } + + @Override + public InputRow getCurrentValue() throws IOException, InterruptedException + { + return new SegmentInputRow( + new MapBasedInputRow( + currRow.getTimestamp(), + spec.getDimensions(), + currRow.getEvent() + ) + ); + } + + @Override + public float getProgress() throws IOException, InterruptedException + { + if (numRows > 0) { + return (rowNum * 1.0f) / numRows; + } else { + return 0; + } + } + + @Override + public void close() throws IOException + { + Closeables.close(firehose, true); + for (QueryableIndex qi : indexes) { + Closeables.close(qi, true); + } + + for (File dir : tmpSegmentDirs) { + FileUtils.deleteDirectory(dir); + } + } + + private DatasourceIngestionSpec readAndVerifyDatasourceIngestionSpec(Configuration config, ObjectMapper jsonMapper) + { + try { + String schema = Preconditions.checkNotNull(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), "null schema"); + logger.info("load schema [%s]", schema); + + DatasourceIngestionSpec spec = jsonMapper.readValue(schema, DatasourceIngestionSpec.class); + + if (spec.getDimensions() == null || spec.getDimensions().size() == 0) { + throw new ISE("load schema does not have dimensions"); + } + + if (spec.getMetrics() == null || spec.getMetrics().size() == 0) { + throw new ISE("load schema does not have metrics"); + } + + return spec; + } + catch (IOException ex) { + throw new RuntimeException("couldn't load segment load spec", ex); + } + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java new file mode 100644 index 000000000000..b25e95918cf7 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/SegmentInputRow.java @@ -0,0 +1,97 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import io.druid.data.input.InputRow; +import io.druid.data.input.Row; +import org.joda.time.DateTime; + +import java.util.List; + +/** + * SegmentInputRow serves as a marker that these InputRow instances have already been combined + * and they contain the columns as they show up in the segment after ingestion, not what you would see in raw + * data. + * It must only be used to represent such InputRows. + */ +public class SegmentInputRow implements InputRow +{ + private final InputRow delegate; + + public SegmentInputRow(InputRow delegate){ + this.delegate = delegate; + } + + @Override + public List getDimensions() + { + return delegate.getDimensions(); + } + + @Override + public long getTimestampFromEpoch() + { + return delegate.getTimestampFromEpoch(); + } + + @Override + public DateTime getTimestamp() + { + return delegate.getTimestamp(); + } + + @Override + public List getDimension(String dimension) + { + return delegate.getDimension(dimension); + } + + @Override + public Object getRaw(String dimension) + { + return delegate.getRaw(dimension); + } + + @Override + public float getFloatMetric(String metric) + { + return delegate.getFloatMetric(metric); + } + + @Override + public long getLongMetric(String metric) + { + return delegate.getLongMetric(metric); + } + + @Override + public int compareTo(Row row) + { + return delegate.compareTo(row); + } + + @Override + public String toString() + { + return "SegmentInputRow{" + + "delegate=" + delegate + + '}'; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java new file mode 100644 index 000000000000..6d912eeb65c1 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java @@ -0,0 +1,184 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.metamx.common.logger.Logger; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.hadoop.DatasourceInputFormat; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +public class DatasourcePathSpec implements PathSpec +{ + private static final Logger logger = new Logger(DatasourcePathSpec.class); + + private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; + private ObjectMapper mapper; + + private DatasourceIngestionSpec ingestionSpec; + private long maxSplitSize; + + public DatasourcePathSpec( + @JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, + @JacksonInject ObjectMapper mapper, + @JsonProperty("ingestionSpec") DatasourceIngestionSpec spec, + @JsonProperty("maxSplitSize") Long maxSplitSize + ) + { + this.indexerMetadataStorageCoordinator = Preconditions.checkNotNull(indexerMetadataStorageCoordinator, "null indexerMetadataStorageCoordinator"); + this.mapper = Preconditions.checkNotNull(mapper, "null mapper"); + this.ingestionSpec = Preconditions.checkNotNull(spec, "null ingestionSpec"); + + if(maxSplitSize == null) { + this.maxSplitSize = 0; + } else { + this.maxSplitSize = maxSplitSize.longValue(); + } + } + + @JsonProperty + public DatasourceIngestionSpec getIngestionSpec() + { + return ingestionSpec; + } + + @JsonProperty + public long getMaxSplitSize() + { + return maxSplitSize; + } + + @Override + public Job addInputPaths( + HadoopDruidIndexerConfig config, Job job + ) throws IOException + { + final List segments = indexerMetadataStorageCoordinator.getUsedSegmentsForInterval( + ingestionSpec.getDataSource(), + ingestionSpec.getInterval() + ); + logger.info( + "Found total [%d] segments for [%s] in interval [%s]", + segments.size(), + ingestionSpec.getDataSource(), + ingestionSpec.getInterval() + ); + + if (ingestionSpec.getDimensions() == null) { + List dims; + if (config.getParser().getParseSpec().getDimensionsSpec().hasCustomDimensions()) { + dims = config.getParser().getParseSpec().getDimensionsSpec().getDimensions(); + } else { + Set dimSet = Sets.newHashSet( + Iterables.concat( + Iterables.transform( + segments, + new Function>() + { + @Override + public Iterable apply(DataSegment dataSegment) + { + return dataSegment.getDimensions(); + } + } + ) + ) + ); + dims = Lists.newArrayList( + Sets.difference( + dimSet, + config.getParser() + .getParseSpec() + .getDimensionsSpec() + .getDimensionExclusions() + ) + ); + } + ingestionSpec = ingestionSpec.withDimensions(dims); + } + + if (ingestionSpec.getMetrics() == null) { + Set metrics = Sets.newHashSet(); + final AggregatorFactory[] cols = config.getSchema().getDataSchema().getAggregators(); + if (cols != null) { + for (AggregatorFactory col : cols) { + metrics.add(col.getName()); + } + } + ingestionSpec = ingestionSpec.withMetrics(Lists.newArrayList(metrics)); + } + + job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(ingestionSpec)); + job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments)); + job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize)); + + return job; + } + + public Class getInputFormat() + { + return DatasourceInputFormat.class; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DatasourcePathSpec that = (DatasourcePathSpec) o; + + if (maxSplitSize != that.maxSplitSize) { + return false; + } + return ingestionSpec.equals(that.ingestionSpec); + + } + + @Override + public int hashCode() + { + int result = ingestionSpec.hashCode(); + result = 31 * result + (int) (maxSplitSize ^ (maxSplitSize >>> 32)); + return result; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java index 90feb83bc956..f1de51316892 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java @@ -32,7 +32,8 @@ @JsonSubTypes(value={ @JsonSubTypes.Type(name="granular_unprocessed", value=GranularUnprocessedPathSpec.class), @JsonSubTypes.Type(name="granularity", value=GranularityPathSpec.class), - @JsonSubTypes.Type(name="static", value=StaticPathSpec.class) + @JsonSubTypes.Type(name="static", value=StaticPathSpec.class), + @JsonSubTypes.Type(name="dataSource", value=DatasourcePathSpec.class) }) public interface PathSpec { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index fb3c63eae0f4..02125160ffa9 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -32,6 +32,7 @@ import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import io.druid.indexer.JobHelper; +import io.druid.indexer.hadoop.DatasourceInputSplit; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.timeline.DataSegment; @@ -42,7 +43,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapreduce.InputFormat; @@ -62,15 +62,10 @@ import org.apache.hadoop.util.Progressable; import javax.annotation.Nullable; -import javax.validation.constraints.NotNull; -import java.io.DataInput; -import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.URI; -import java.net.URISyntaxException; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; @@ -470,10 +465,10 @@ protected void map( ) throws IOException, InterruptedException { final InputSplit split = context.getInputSplit(); - if (!(split instanceof DataSegmentSplit)) { + if (!(split instanceof DatasourceInputSplit)) { throw new IAE( "Unexpected split type. Expected [%s] was [%s]", - DataSegmentSplit.class.getCanonicalName(), + DatasourceInputSplit.class.getCanonicalName(), split.getClass().getCanonicalName() ); } @@ -481,13 +476,13 @@ protected void map( final String tmpDirLoc = context.getConfiguration().get(TMP_FILE_LOC_KEY); final File tmpDir = Paths.get(tmpDirLoc).toFile(); - final DataSegment segment = ((DataSegmentSplit) split).getDataSegment(); + final DataSegment segment = Iterables.getOnlyElement(((DatasourceInputSplit) split).getSegments()); final HadoopDruidConverterConfig config = converterConfigFromConfiguration(context.getConfiguration()); context.setStatus("DOWNLOADING"); context.progress(); - final Path inPath = new Path(getURIFromSegment(segment)); + final Path inPath = new Path(JobHelper.getURIFromSegment(segment)); final File inDir = new File(tmpDir, "in"); if (inDir.exists() && !inDir.delete()) { @@ -559,38 +554,6 @@ protected void setup(Context context) throws IOException, InterruptedException context.getConfiguration().set(TMP_FILE_LOC_KEY, tmpFile.getAbsolutePath()); } - private static URI getURIFromSegment(DataSegment dataSegment) - { - // There is no good way around this... - // TODO: add getURI() to URIDataPuller - final Map loadSpec = dataSegment.getLoadSpec(); - final String type = loadSpec.get("type").toString(); - final URI segmentLocURI; - if ("s3_zip".equals(type)) { - segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key"))); - } else if ("hdfs".equals(type)) { - segmentLocURI = URI.create(loadSpec.get("path").toString()); - } else if ("local".equals(type)) { - try { - segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null); - } - catch (URISyntaxException e) { - throw new ISE(e, "Unable to form simple file uri"); - } - } else { - try { - throw new IAE( - "Cannot figure out loadSpec %s", - HadoopDruidConverterConfig.jsonMapper.writeValueAsString(loadSpec) - ); - } - catch (JsonProcessingException e) { - throw new ISE("Cannot write Map with json mapper"); - } - } - return segmentLocURI; - } - @Override protected void cleanup( Context context @@ -604,50 +567,6 @@ protected void cleanup( } } - public static class DataSegmentSplit extends InputSplit implements Writable - { - private DataSegment dataSegment = null; - - public DataSegmentSplit() - { - // For serialization purposes - } - - public DataSegmentSplit(@NotNull DataSegment dataSegment) - { - this.dataSegment = Preconditions.checkNotNull(dataSegment, "dataSegment"); - } - - @Override - public long getLength() throws IOException, InterruptedException - { - return dataSegment.getSize(); - } - - @Override - public String[] getLocations() throws IOException, InterruptedException - { - return new String[]{}; - } - - protected DataSegment getDataSegment() - { - return dataSegment; - } - - @Override - public void write(DataOutput out) throws IOException - { - out.write(HadoopDruidConverterConfig.jsonMapper.writeValueAsString(dataSegment).getBytes()); - } - - @Override - public void readFields(DataInput in) throws IOException - { - dataSegment = HadoopDruidConverterConfig.jsonMapper.readValue(in.readLine(), DataSegment.class); - } - } - public static class ConfigInputFormat extends InputFormat { @Override @@ -665,7 +584,7 @@ public List getSplits(final JobContext jobContext) throws IOExceptio @Override public InputSplit apply(DataSegment input) { - return new DataSegmentSplit(input); + return new DatasourceInputSplit(ImmutableList.of(input)); } } ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java new file mode 100644 index 000000000000..702047ee12a1 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceIngestionSpecTest.java @@ -0,0 +1,52 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.filter.SelectorDimFilter; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class DatasourceIngestionSpecTest +{ + @Test + public void testSerde() throws Exception + { + DatasourceIngestionSpec expected = new DatasourceIngestionSpec( + "test", + Interval.parse("2014/2015"), + new SelectorDimFilter("dim", "value"), + QueryGranularity.DAY, + Lists.newArrayList("d1", "d2"), + Lists.newArrayList("m1", "m2", "m3") + ); + + ObjectMapper jsonMapper = new DefaultObjectMapper(); + + DatasourceIngestionSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), DatasourceIngestionSpec.class); + Assert.assertEquals(expected, actual); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java new file mode 100644 index 000000000000..fe3802eccb0f --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java @@ -0,0 +1,154 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class DatasourceInputFormatTest +{ + private List segments; + private Configuration config; + private JobContext context; + + @Before + public void setUp() throws Exception + { + segments = ImmutableList.of( + new DataSegment( + "test1", + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index1.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 2 + ), + new DataSegment( + "test2", + Interval.parse("2050/3000"), + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp/index2.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 11 + ), + new DataSegment( + "test3", + Interval.parse("2030/3000"), + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp/index3.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 4 + ) + ); + + config = new Configuration(); + config.set( + DatasourceInputFormat.CONF_INPUT_SEGMENTS, + new DefaultObjectMapper().writeValueAsString(segments) + ); + + context = EasyMock.createMock(JobContext.class); + EasyMock.expect(context.getConfiguration()).andReturn(config); + EasyMock.replay(context); + } + + @Test + public void testGetSplitsNoCombining() throws Exception + { + List splits = new DatasourceInputFormat().getSplits(context); + + Assert.assertEquals(segments.size(), splits.size()); + for (int i = 0; i < segments.size(); i++) { + Assert.assertEquals(segments.get(i), ((DatasourceInputSplit) splits.get(i)).getSegments().get(0)); + } + } + + @Test + public void testGetSplitsAllCombined() throws Exception + { + config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "999999"); + List splits = new DatasourceInputFormat().getSplits(context); + + Assert.assertEquals(1, splits.size()); + Assert.assertEquals( + Sets.newHashSet(segments), + Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) + ); + } + + @Test + public void testGetSplitsCombineInTwo() throws Exception + { + config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "6"); + List splits = new DatasourceInputFormat().getSplits(context); + + Assert.assertEquals(2, splits.size()); + + Assert.assertEquals( + Sets.newHashSet(segments.get(0), segments.get(2)), + Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) + ); + + Assert.assertEquals( + Sets.newHashSet(segments.get(1)), + Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments())) + ); + } + + @Test + public void testGetRecordReader() throws Exception + { + Assert.assertTrue(new DatasourceInputFormat().createRecordReader(null, null) instanceof DatasourceRecordReader); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java new file mode 100644 index 000000000000..87872339cfe5 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java @@ -0,0 +1,71 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInput; + +/** + */ +public class DatasourceInputSplitTest +{ + @Test + public void testSerde() throws Exception + { + DatasourceInputSplit expected = new DatasourceInputSplit( + Lists.newArrayList( + new DataSegment( + "test", + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12334 + ) + ) + ); + + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + expected.write(out); + + DataInput in = ByteStreams.newDataInput(out.toByteArray()); + DatasourceInputSplit actual = new DatasourceInputSplit(); + actual.readFields(in); + + Assert.assertEquals(expected.getSegments(), actual.getSegments()); + Assert.assertEquals(12334, actual.getLength()); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java new file mode 100644 index 000000000000..c4c25f15d4f2 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java @@ -0,0 +1,136 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.hadoop; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.druid.data.input.InputRow; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.timeline.DataSegment; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +/** + */ +public class DatasourceRecordReaderTest +{ + @Test + public void testSanity() throws Exception + { + DataSegment segment = new DefaultObjectMapper() + .readValue(this.getClass().getClassLoader().getResource("test-segment/descriptor.json"), DataSegment.class) + .withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath() + ) + ); + InputSplit split = new DatasourceInputSplit(Lists.newArrayList(segment)); + + Configuration config = new Configuration(); + config.set( + DatasourceInputFormat.CONF_DRUID_SCHEMA, + HadoopDruidIndexerConfig.jsonMapper.writeValueAsString( + new DatasourceIngestionSpec( + segment.getDataSource(), + segment.getInterval(), + null, + null, + segment.getDimensions(), + segment.getMetrics() + ) + ) + ); + + TaskAttemptContext context = EasyMock.createNiceMock(TaskAttemptContext.class); + EasyMock.expect(context.getConfiguration()).andReturn(config).anyTimes(); + EasyMock.replay(context); + + DatasourceRecordReader rr = new DatasourceRecordReader(); + rr.initialize(split, context); + + Assert.assertEquals(0, rr.getProgress(), 0.0001); + + List rows = Lists.newArrayList(); + while(rr.nextKeyValue()) { + rows.add(rr.getCurrentValue()); + } + verifyRows(rows); + + Assert.assertEquals(1, rr.getProgress(), 0.0001); + + rr.close(); + } + + private void verifyRows(List actualRows) + { + List> expectedRows = ImmutableList.of( + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T00:00:00.000Z"), + "host", ImmutableList.of("a.example.com"), + "visited_sum", 100L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T01:00:00.000Z"), + "host", ImmutableList.of("b.example.com"), + "visited_sum", 150L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T02:00:00.000Z"), + "host", ImmutableList.of("c.example.com"), + "visited_sum", 200L, + "unique_hosts", 1.0d + ) + ); + + Assert.assertEquals(expectedRows.size(), actualRows.size()); + + for (int i = 0; i < expectedRows.size(); i++) { + Map expected = expectedRows.get(i); + InputRow actual = actualRows.get(i); + + Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions()); + + Assert.assertEquals(expected.get("time"), actual.getTimestamp()); + Assert.assertEquals(expected.get("host"), actual.getDimension("host")); + Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum")); + Assert.assertEquals( + (Double) expected.get("unique_hosts"), + (Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")), + 0.001 + ); + } + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java new file mode 100644 index 000000000000..304006c8de97 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -0,0 +1,229 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.metamx.common.Granularity; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.guice.GuiceInjectors; +import io.druid.guice.JsonConfigProvider; +import io.druid.guice.annotations.Self; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopIOConfig; +import io.druid.indexer.HadoopIngestionSpec; +import io.druid.indexer.HadoopTuningConfig; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.hadoop.DatasourceInputFormat; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.initialization.Initialization; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.server.DruidNode; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class DatasourcePathSpecTest +{ + private DatasourceIngestionSpec ingestionSpec; + + public DatasourcePathSpecTest() + { + this.ingestionSpec = new DatasourceIngestionSpec( + "test", + Interval.parse("2000/3000"), + null, + null, + null, + null + ); + } + + @Test + public void testSerde() throws Exception + { + final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator = EasyMock.createMock( + IndexerMetadataStorageCoordinator.class + ); + + Injector injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(IndexerMetadataStorageCoordinator.class).toInstance(indexerMetadataStorageCoordinator); + JsonConfigProvider.bindInstance( + binder, Key.get(DruidNode.class, Self.class), new DruidNode("dummy-node", null, null) + ); + } + } + ) + ); + + ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class); + + DatasourcePathSpec expected = new DatasourcePathSpec( + indexerMetadataStorageCoordinator, + jsonMapper, + ingestionSpec, + Long.valueOf(10) + ); + PathSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class); + Assert.assertEquals(expected, actual); + + expected = new DatasourcePathSpec( + indexerMetadataStorageCoordinator, + jsonMapper, + ingestionSpec, + null + ); + actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class); + Assert.assertEquals(expected, actual); + } + + @Test + public void testAddInputPaths() throws Exception + { + HadoopDruidIndexerConfig hadoopIndexerConfig = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + ingestionSpec.getDataSource(), + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(null, null, null), + null, + ImmutableList.of("timestamp", "host", "visited") + ) + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("visited_sum", "visited") + }, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(Interval.parse("2000/3000")) + ) + ), + new HadoopIOConfig( + ImmutableMap.of( + "paths", + "/tmp/dummy", + "type", + "static" + ), + null, + "/tmp/dummy" + ), + HadoopTuningConfig.makeDefaultTuningConfig().withWorkingPath("/tmp/work").withVersion("ver") + ) + ); + + List segments = ImmutableList.of( + new DataSegment( + ingestionSpec.getDataSource(), + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index.zip" + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12334 + ), + new DataSegment( + ingestionSpec.getDataSource(), + Interval.parse("2050/3000"), + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp/index.zip" + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12335 + ) + ); + + IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator = EasyMock.createMock(IndexerMetadataStorageCoordinator.class); + EasyMock.expect(indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(ingestionSpec.getDataSource(), ingestionSpec.getInterval())).andReturn(segments); + EasyMock.replay(indexerMetadataStorageCoordinator); + + ObjectMapper mapper = new DefaultObjectMapper(); + + DatasourcePathSpec pathSpec = new DatasourcePathSpec( + indexerMetadataStorageCoordinator, + mapper, + ingestionSpec, + null + ); + + Configuration config = new Configuration(); + Job job = EasyMock.createNiceMock(Job.class); + EasyMock.expect(job.getConfiguration()).andReturn(config).anyTimes(); + EasyMock.replay(job); + + pathSpec.addInputPaths(hadoopIndexerConfig, job); + List actualSegments = mapper.readValue( + config.get(DatasourceInputFormat.CONF_INPUT_SEGMENTS), + new TypeReference>() + { + } + ); + + Assert.assertEquals(segments, actualSegments); + + DatasourceIngestionSpec actualIngestionSpec = mapper.readValue(config.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), DatasourceIngestionSpec.class); + Assert.assertEquals(ingestionSpec + .withDimensions(ImmutableList.of("product")) + .withMetrics(ImmutableList.of("visited_sum")), + actualIngestionSpec); + } +} diff --git a/indexing-hadoop/src/test/resources/test-segment/descriptor.json b/indexing-hadoop/src/test/resources/test-segment/descriptor.json new file mode 100644 index 000000000000..f892b765f555 --- /dev/null +++ b/indexing-hadoop/src/test/resources/test-segment/descriptor.json @@ -0,0 +1,17 @@ +{ + "binaryVersion": 9, + "dataSource": "testds", + "dimensions": "host", + "identifier": "testds_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-07-15T22:02:40.171Z", + "interval": "2014-10-22T00:00:00.000Z/2014-10-23T00:00:00.000Z", + "loadSpec": { + "path": "test-segment/index.zip", + "type": "local" + }, + "metrics": "visited_sum,unique_hosts", + "shardSpec": { + "type": "none" + }, + "size": 4096, + "version": "2015-07-15T22:02:40.171Z" +} diff --git a/indexing-hadoop/src/test/resources/test-segment/index.zip b/indexing-hadoop/src/test/resources/test-segment/index.zip new file mode 100644 index 0000000000000000000000000000000000000000..f9e15b415ff5c90e725b374f6f5e4b656d1ebe6d GIT binary patch literal 1019 zcmWIWW@Zs#U|`^22q^vN#+9h}@E{`t!xAzpH^-9y(f#pBAm2>ZMTC<=tiU`BJRWWN)L-pM|=$8h1@L@-7nQ zZVK)W3c4#a+51ytQ&_>*lRqm@&OEx(YpX-)RMnOL99&%;66McN5RkuK5pFN;?{2^J z){Q7NHid)~7NK(vXLyb@Ijmtha?Ih(>=o0aCeGk4nIBhFr{|%-8LJ=}+0ZGN!=Ze_ zLqL($NT#EuVaECSjT1Rf{I)-Ia54j1#f)r+!*iHBSQ{dkIwGYGB(g1=xxj>bf}gvu z%ejC1812>xE?KISAi@|L%3#{a@<*uHu}RV)EifZ4EiqF|!?oc}+J+{p9{!Hhh6wJC zof3+C$y1Wt9vPlCR6M=0Y0t!_S0VECLBZ92b#c3d1U1E92(V@Z1-Gb~dMb#8dNFQl zU0!JLV!niL;s5oHL3bEjd&EqV8dA9y-*ID;Ee-Zp*zTy+*L<*}u~>_p`F{q_Tz4(u zGZRm-1XM2I(_%LI)a~71;;(5j`&jdiqYifEk!gi#o>^Wsg_(L0n;BF#9GsOnYeB=s z%OzUs9)28~CowPtpd^C?b#u!YV4^4iVs2nE$W1LtBq9~4o-yQWP!MprSa*JtVGmRH z@`a5@bgyV6*z8;JcJ}<82b9;`FsRh#>Du^e)=h&42MdfYZ<>DV;0A7myFvTfOO~0x z)OPM!xb5Y|qMFm6+I8}lcz0i$la!f#UGKiOEy#gfK<5CR3QVUv(q}}l0NEf6a$s3% zQE_H|o?cRB9=>D>RLvRS&B!Fjj4R1Y0F^+%mPQbbngLiL82~Nuqnn5tdJq$Vd5U36 qBRdlVYFOek5SMR}owx#MAd25{8ORFE& Date: Mon, 18 May 2015 13:41:06 -0500 Subject: [PATCH 4/7] add ability to specify Multiple PathSpecs in batch ingestion, so that we can grab data from multiple places in same ingestion Conflicts: indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java Conflicts: indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java --- .../indexer/DetermineHashedPartitionsJob.java | 1 - .../druid/indexer/DeterminePartitionsJob.java | 2 - .../indexer/HadoopDruidIndexerConfig.java | 6 -- .../io/druid/indexer/IndexGeneratorJob.java | 2 - .../main/java/io/druid/indexer/JobHelper.java | 13 --- .../indexer/path/DatasourcePathSpec.java | 7 +- .../indexer/path/GranularityPathSpec.java | 5 +- .../druid/indexer/path/MultiplePathSpec.java | 80 +++++++++++++++++++ .../java/io/druid/indexer/path/PathSpec.java | 6 +- .../io/druid/indexer/path/StaticPathSpec.java | 60 +++++++++++++- .../indexer/path/GranularityPathSpecTest.java | 12 +-- .../indexer/path/MultiplePathSpecTest.java | 65 +++++++++++++++ .../indexer/path/StaticPathSpecTest.java | 33 +++++--- 13 files changed, 235 insertions(+), 57 deletions(-) create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/path/MultiplePathSpec.java create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/path/MultiplePathSpecTest.java diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index e217b82ee3ec..74e9b6295be0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -88,7 +88,6 @@ public boolean run() ); JobHelper.injectSystemProperties(groupByJob); - JobHelper.setInputFormat(groupByJob, config); groupByJob.setMapperClass(DetermineCardinalityMapper.class); groupByJob.setMapOutputKeyClass(LongWritable.class); groupByJob.setMapOutputValueClass(BytesWritable.class); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index a21e5f7437b3..0060303888f9 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -126,7 +126,6 @@ public boolean run() ); JobHelper.injectSystemProperties(groupByJob); - JobHelper.setInputFormat(groupByJob, config); groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class); groupByJob.setMapOutputKeyClass(BytesWritable.class); groupByJob.setMapOutputValueClass(NullWritable.class); @@ -173,7 +172,6 @@ public boolean run() } else { // Directly read the source data, since we assume it's already grouped. dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class); - JobHelper.setInputFormat(dimSelectionJob, config); config.addInputPaths(dimSelectionJob); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 59f187fb8a23..19d3a7a606da 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -55,7 +55,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -354,11 +353,6 @@ public Job addInputPaths(Job job) throws IOException return pathSpec.addInputPaths(this, job); } - public Class getInputFormatClass() - { - return pathSpec.getInputFormat(); - } - /******************************************** Granularity/Bucket Helper Methods ********************************************/ diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 08d852046fff..f8a3b7be9b47 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -140,8 +140,6 @@ public boolean run() JobHelper.injectSystemProperties(job); - JobHelper.setInputFormat(job, config); - job.setMapperClass(IndexGeneratorMapper.class); job.setMapOutputValueClass(BytesWritable.class); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java index 541f561fc48f..a9c282ddfadd 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java @@ -44,8 +44,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Progressable; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -207,17 +205,6 @@ public static boolean runJobs(List jobs, HadoopDruidIndexerConfig config) return true; } - public static void setInputFormat(Job job, HadoopDruidIndexerConfig indexerConfig) - { - if (indexerConfig.getInputFormatClass() != null) { - job.setInputFormatClass(indexerConfig.getInputFormatClass()); - } else if (indexerConfig.isCombineText()) { - job.setInputFormatClass(CombineTextInputFormat.class); - } else { - job.setInputFormatClass(TextInputFormat.class); - } - } - public static DataSegment serializeOutIndex( final DataSegment segmentTemplate, final Configuration configuration, diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java index 6d912eeb65c1..536a2fc822c0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java @@ -35,7 +35,6 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.timeline.DataSegment; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; @@ -146,15 +145,11 @@ public Iterable apply(DataSegment dataSegment) job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(ingestionSpec)); job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments)); job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize)); + MultipleInputs.addInputPath(job, new Path("/dummy/tobe/ignored"), DatasourceInputFormat.class); return job; } - public Class getInputFormat() - { - return DatasourceInputFormat.class; - } - @Override public boolean equals(Object o) { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java index 14f8db615f90..e793827b65b4 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java @@ -30,7 +30,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.format.DateTimeFormat; @@ -152,7 +155,7 @@ public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOExce for (String path : paths) { log.info("Appending path[%s]", path); - FileInputFormat.addInputPath(job, new Path(path)); + StaticPathSpec.addToMultipleInputs(config, job, path, inputFormat); } return job; diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/MultiplePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/MultiplePathSpec.java new file mode 100644 index 000000000000..7c8808f00e7e --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/MultiplePathSpec.java @@ -0,0 +1,80 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import io.druid.indexer.HadoopDruidIndexerConfig; +import org.apache.hadoop.mapreduce.Job; + +import java.io.IOException; +import java.util.List; + +public class MultiplePathSpec implements PathSpec +{ + private List children; + + public MultiplePathSpec( + @JsonProperty("children") List children + ) + { + Preconditions.checkArgument(children != null && children.size() > 0, "Null/Empty list of child PathSpecs"); + this.children = children; + } + + @JsonProperty + public List getChildren() + { + return children; + } + + @Override + public Job addInputPaths( + HadoopDruidIndexerConfig config, Job job + ) throws IOException + { + for(PathSpec spec : children) { + spec.addInputPaths(config, job); + } + return job; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MultiplePathSpec that = (MultiplePathSpec) o; + + return children.equals(that.children); + + } + + @Override + public int hashCode() + { + return children.hashCode(); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java index f1de51316892..f7433fc0cc77 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java @@ -20,8 +20,6 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.indexer.HadoopDruidIndexerConfig; - -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import java.io.IOException; @@ -33,10 +31,10 @@ @JsonSubTypes.Type(name="granular_unprocessed", value=GranularUnprocessedPathSpec.class), @JsonSubTypes.Type(name="granularity", value=GranularityPathSpec.class), @JsonSubTypes.Type(name="static", value=StaticPathSpec.class), - @JsonSubTypes.Type(name="dataSource", value=DatasourcePathSpec.class) + @JsonSubTypes.Type(name="dataSource", value=DatasourcePathSpec.class), + @JsonSubTypes.Type(name="multi", value=MultiplePathSpec.class) }) public interface PathSpec { public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException; - public Class getInputFormat(); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java index c6cc63d794a3..b0f5e19e188e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/StaticPathSpec.java @@ -20,10 +20,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.metamx.common.logger.Logger; import io.druid.indexer.HadoopDruidIndexerConfig; - +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.IOException; @@ -60,7 +62,9 @@ public StaticPathSpec( public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException { log.info("Adding paths[%s]", paths); - FileInputFormat.addInputPaths(job, paths); + + addToMultipleInputs(config, job, paths, inputFormat); + return job; } @@ -68,4 +72,54 @@ public Class getInputFormat() { return inputFormat; } + + public String getPaths() + { + return paths; + } + + public final static void addToMultipleInputs( + HadoopDruidIndexerConfig config, + Job job, + String path, + Class inputFormatClass + ) + { + if (inputFormatClass == null) { + if (config.isCombineText()) { + MultipleInputs.addInputPath(job, new Path(path), CombineTextInputFormat.class); + } else { + MultipleInputs.addInputPath(job, new Path(path), TextInputFormat.class); + } + } else { + MultipleInputs.addInputPath(job, new Path(path), inputFormatClass); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + StaticPathSpec that = (StaticPathSpec) o; + + if (paths != null ? !paths.equals(that.paths) : that.paths != null) { + return false; + } + return !(inputFormat != null ? !inputFormat.equals(that.inputFormat) : that.inputFormat != null); + + } + + @Override + public int hashCode() + { + int result = paths != null ? paths.hashCode() : 0; + result = 31 * result + (inputFormat != null ? inputFormat.hashCode() : 0); + return result; + } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index a7c5fc2033aa..fcad18866ffe 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -76,18 +76,18 @@ public class GranularityPathSpecTest } @Test - public void testDeserialization() throws Exception + public void testSerdeCustomInputFormat() throws Exception { - testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class); + testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, TextInputFormat.class); } @Test - public void testDeserializationNoInputFormat() throws Exception + public void testSerdeNoInputFormat() throws Exception { - testDeserialization("/test/path", "*.test", "pat_pat", Granularity.SECOND, null); + testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, null); } - private void testDeserialization( + private void testSerde( String inputPath, String filePattern, String pathFormat, @@ -114,7 +114,7 @@ private void testDeserialization( } sb.append("\"type\" : \"granularity\"}"); - GranularityPathSpec pathSpec = (GranularityPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class); + GranularityPathSpec pathSpec = (GranularityPathSpec) StaticPathSpecTest.readWriteRead(sb.toString(), jsonMapper); Assert.assertEquals(inputFormat, pathSpec.getInputFormat()); Assert.assertEquals(inputPath, pathSpec.getInputPath()); Assert.assertEquals(filePattern, pathSpec.getFilePattern()); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/MultiplePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/MultiplePathSpecTest.java new file mode 100644 index 000000000000..168d41e1953a --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/MultiplePathSpecTest.java @@ -0,0 +1,65 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import io.druid.jackson.DefaultObjectMapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class MultiplePathSpecTest +{ + @Test + public void testSerde() throws Exception + { + PathSpec expected = new MultiplePathSpec( + Lists.newArrayList( + new StaticPathSpec("/tmp/path1", null), + new StaticPathSpec("/tmp/path2", TextInputFormat.class) + ) + ); + + ObjectMapper jsonMapper = new DefaultObjectMapper(); + + PathSpec actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class); + Assert.assertEquals(expected, actual); + } + + @Test + public void testAddInputPaths() throws Exception + { + PathSpec ps1 = EasyMock.createMock(PathSpec.class); + EasyMock.expect(ps1.addInputPaths(null, null)).andReturn(null); + + PathSpec ps2 = EasyMock.createMock(PathSpec.class); + EasyMock.expect(ps2.addInputPaths(null, null)).andReturn(null); + + EasyMock.replay(ps1, ps2); + + new MultiplePathSpec(Lists.newArrayList(ps1, ps2)).addInputPaths(null, null); + + EasyMock.verify(ps1, ps2); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java index c8bf9a8575ad..b654d7849f0b 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/StaticPathSpecTest.java @@ -21,8 +21,6 @@ import io.druid.jackson.DefaultObjectMapper; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.junit.Assert; import org.junit.Test; @@ -34,18 +32,18 @@ public class StaticPathSpecTest private final ObjectMapper jsonMapper = new DefaultObjectMapper(); @Test - public void testDeserialization() throws Exception + public void testSerdeCustomInputFormat() throws Exception { - testDeserialization("/sample/path", TextInputFormat.class); + testSerde("/sample/path", TextInputFormat.class); } @Test public void testDeserializationNoInputFormat() throws Exception { - testDeserialization("/sample/path", null); + testSerde("/sample/path", null); } - private void testDeserialization(String path, Class inputFormat) throws Exception + private void testSerde(String path, Class inputFormat) throws Exception { StringBuilder sb = new StringBuilder(); sb.append("{\"paths\" : \""); @@ -57,13 +55,22 @@ private void testDeserialization(String path, Class inputFormat) throws Exceptio sb.append("\","); } sb.append("\"type\" : \"static\"}"); - StaticPathSpec pathSpec = (StaticPathSpec)jsonMapper.readValue(sb.toString(), PathSpec.class); + + StaticPathSpec pathSpec = (StaticPathSpec) readWriteRead(sb.toString(), jsonMapper); Assert.assertEquals(inputFormat, pathSpec.getInputFormat()); - - Job job = Job.getInstance(); - pathSpec.addInputPaths(null, job); - Assert.assertEquals( - "file:" + path, - job.getConfiguration().get(FileInputFormat.INPUT_DIR)); + Assert.assertEquals(path, pathSpec.getPaths()); + } + + public static final PathSpec readWriteRead(String jsonStr, ObjectMapper jsonMapper) throws Exception + { + return jsonMapper.readValue( + jsonMapper.writeValueAsString( + jsonMapper.readValue( + jsonStr, + PathSpec.class + ) + ), + PathSpec.class + ); } } From 15fa43dd43131e594cd011ee2805f689d4d1a097 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 12 Aug 2015 17:32:35 -0500 Subject: [PATCH 5/7] changing DatasourcePathSpec, to get segment list, so that hadoop indexer uses overlord action to get list of segments and passes when running as an overlord task. and, uses metadata store directly when running as standalone hadoop indexer also, serialized list of segments is passed to DatasourcePathSpec so that hadoop classloader issues do not creep up --- .../indexer/HadoopDruidIndexerConfig.java | 7 + .../io/druid/indexer/HadoopIngestionSpec.java | 49 ++++++ .../indexer/path/DatasourcePathSpec.java | 45 ++--- .../MetadataStoreBasedUsedSegmentLister.java | 53 ++++++ .../druid/indexer/path/UsedSegmentLister.java | 44 +++++ .../MetadataStorageUpdaterJobSpec.java | 17 ++ ...cUpdateDatasourcePathSpecSegmentsTest.java | 154 ++++++++++++++++++ .../indexer/path/DatasourcePathSpecTest.java | 88 +++++----- .../indexing/common/task/HadoopIndexTask.java | 22 ++- .../OverlordActionBasedUsedSegmentLister.java | 54 ++++++ .../indexing/common/task/TaskSerdeTest.java | 8 +- .../druid/cli/CliInternalHadoopIndexer.java | 28 +++- 12 files changed, 504 insertions(+), 65 deletions(-) create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/path/MetadataStoreBasedUsedSegmentLister.java create mode 100644 indexing-hadoop/src/main/java/io/druid/indexer/path/UsedSegmentLister.java create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java create mode 100644 indexing-service/src/main/java/io/druid/indexing/hadoop/OverlordActionBasedUsedSegmentLister.java diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 19d3a7a606da..3a5409453fe6 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -18,6 +18,7 @@ package io.druid.indexer; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -247,6 +248,12 @@ public HadoopIngestionSpec getSchema() return schema; } + @JsonIgnore + public PathSpec getPathSpec() + { + return pathSpec; + } + public String getDataSource() { return schema.getDataSchema().getDataSource(); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java index 4721b6f5f2c1..ea972eea7b04 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIngestionSpec.java @@ -19,8 +19,16 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.path.UsedSegmentLister; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.IngestionSpec; +import io.druid.timeline.DataSegment; + +import java.io.IOException; +import java.util.List; +import java.util.Map; /** */ @@ -91,4 +99,45 @@ public HadoopIngestionSpec withTuningConfig(HadoopTuningConfig config) config ); } + + public static HadoopIngestionSpec updateSegmentListIfDatasourcePathSpecIsUsed( + HadoopIngestionSpec spec, + ObjectMapper jsonMapper, + UsedSegmentLister segmentLister + ) + throws IOException + { + String dataSource = "dataSource"; + String type = "type"; + String multi = "multi"; + String children = "children"; + String segments = "segments"; + String ingestionSpec = "ingestionSpec"; + + Map pathSpec = spec.getIOConfig().getPathSpec(); + Map datasourcePathSpec = null; + if(pathSpec.get(type).equals(dataSource)) { + datasourcePathSpec = pathSpec; + } else if(pathSpec.get(type).equals(multi)) { + List> childPathSpecs = (List>) pathSpec.get(children); + for(Map childPathSpec : childPathSpecs) { + if (childPathSpec.get(type).equals(dataSource)) { + datasourcePathSpec = childPathSpec; + break; + } + } + } + if (datasourcePathSpec != null) { + Map ingestionSpecMap = (Map) datasourcePathSpec.get(ingestionSpec); + DatasourceIngestionSpec ingestionSpecObj = jsonMapper.convertValue(ingestionSpecMap, DatasourceIngestionSpec.class); + List segmentsList = segmentLister.getUsedSegmentsForInterval( + ingestionSpecObj.getDataSource(), + ingestionSpecObj.getInterval() + ); + datasourcePathSpec.put(segments, segmentsList); + } + + return spec; + } + } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java index 536a2fc822c0..90d09248862e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java @@ -22,8 +22,8 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.api.client.repackaged.com.google.common.base.Preconditions; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -31,7 +31,6 @@ import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.hadoop.DatasourceIngestionSpec; import io.druid.indexer.hadoop.DatasourceInputFormat; -import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.timeline.DataSegment; import org.apache.hadoop.fs.Path; @@ -46,21 +45,20 @@ public class DatasourcePathSpec implements PathSpec { private static final Logger logger = new Logger(DatasourcePathSpec.class); - private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; - private ObjectMapper mapper; - - private DatasourceIngestionSpec ingestionSpec; - private long maxSplitSize; + private final ObjectMapper mapper; + private final DatasourceIngestionSpec ingestionSpec; + private final long maxSplitSize; + private final List segments; public DatasourcePathSpec( - @JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, @JacksonInject ObjectMapper mapper, + @JsonProperty("segments") List segments, @JsonProperty("ingestionSpec") DatasourceIngestionSpec spec, @JsonProperty("maxSplitSize") Long maxSplitSize ) { - this.indexerMetadataStorageCoordinator = Preconditions.checkNotNull(indexerMetadataStorageCoordinator, "null indexerMetadataStorageCoordinator"); this.mapper = Preconditions.checkNotNull(mapper, "null mapper"); + this.segments = segments; this.ingestionSpec = Preconditions.checkNotNull(spec, "null ingestionSpec"); if(maxSplitSize == null) { @@ -70,6 +68,12 @@ public DatasourcePathSpec( } } + @JsonProperty + public List getSegments() + { + return segments; + } + @JsonProperty public DatasourceIngestionSpec getIngestionSpec() { @@ -87,10 +91,8 @@ public Job addInputPaths( HadoopDruidIndexerConfig config, Job job ) throws IOException { - final List segments = indexerMetadataStorageCoordinator.getUsedSegmentsForInterval( - ingestionSpec.getDataSource(), - ingestionSpec.getInterval() - ); + Preconditions.checkArgument(segments != null && !segments.isEmpty(), "no segments provided"); + logger.info( "Found total [%d] segments for [%s] in interval [%s]", segments.size(), @@ -98,7 +100,8 @@ public Job addInputPaths( ingestionSpec.getInterval() ); - if (ingestionSpec.getDimensions() == null) { + DatasourceIngestionSpec updatedIngestionSpec = ingestionSpec; + if (updatedIngestionSpec.getDimensions() == null) { List dims; if (config.getParser().getParseSpec().getDimensionsSpec().hasCustomDimensions()) { dims = config.getParser().getParseSpec().getDimensionsSpec().getDimensions(); @@ -128,10 +131,10 @@ public Iterable apply(DataSegment dataSegment) ) ); } - ingestionSpec = ingestionSpec.withDimensions(dims); + updatedIngestionSpec = updatedIngestionSpec.withDimensions(dims); } - if (ingestionSpec.getMetrics() == null) { + if (updatedIngestionSpec.getMetrics() == null) { Set metrics = Sets.newHashSet(); final AggregatorFactory[] cols = config.getSchema().getDataSchema().getAggregators(); if (cols != null) { @@ -139,10 +142,10 @@ public Iterable apply(DataSegment dataSegment) metrics.add(col.getName()); } } - ingestionSpec = ingestionSpec.withMetrics(Lists.newArrayList(metrics)); + updatedIngestionSpec = updatedIngestionSpec.withMetrics(Lists.newArrayList(metrics)); } - job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(ingestionSpec)); + job.getConfiguration().set(DatasourceInputFormat.CONF_DRUID_SCHEMA, mapper.writeValueAsString(updatedIngestionSpec)); job.getConfiguration().set(DatasourceInputFormat.CONF_INPUT_SEGMENTS, mapper.writeValueAsString(segments)); job.getConfiguration().set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, String.valueOf(maxSplitSize)); MultipleInputs.addInputPath(job, new Path("/dummy/tobe/ignored"), DatasourceInputFormat.class); @@ -165,7 +168,10 @@ public boolean equals(Object o) if (maxSplitSize != that.maxSplitSize) { return false; } - return ingestionSpec.equals(that.ingestionSpec); + if (!ingestionSpec.equals(that.ingestionSpec)) { + return false; + } + return !(segments != null ? !segments.equals(that.segments) : that.segments != null); } @@ -174,6 +180,7 @@ public int hashCode() { int result = ingestionSpec.hashCode(); result = 31 * result + (int) (maxSplitSize ^ (maxSplitSize >>> 32)); + result = 31 * result + (segments != null ? segments.hashCode() : 0); return result; } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/MetadataStoreBasedUsedSegmentLister.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/MetadataStoreBasedUsedSegmentLister.java new file mode 100644 index 000000000000..f39a49bcc0bd --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/MetadataStoreBasedUsedSegmentLister.java @@ -0,0 +1,53 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import com.google.common.base.Preconditions; +import com.google.inject.Inject; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.io.IOException; +import java.util.List; + +/** + */ +public class MetadataStoreBasedUsedSegmentLister implements UsedSegmentLister +{ + private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; + + @Inject + public MetadataStoreBasedUsedSegmentLister(IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator) + { + this.indexerMetadataStorageCoordinator = Preconditions.checkNotNull( + indexerMetadataStorageCoordinator, + "null indexerMetadataStorageCoordinator" + ); + } + + @Override + public List getUsedSegmentsForInterval( + String dataSource, Interval interval + ) throws IOException + { + return indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(dataSource, interval); + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/UsedSegmentLister.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/UsedSegmentLister.java new file mode 100644 index 000000000000..186c4563d401 --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/UsedSegmentLister.java @@ -0,0 +1,44 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer.path; + +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.io.IOException; +import java.util.List; + +/** + */ +public interface UsedSegmentLister +{ + /** + * Get all segments which may include any data in the interval and are flagged as used. + * + * @param dataSource The datasource to query + * @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive + * + * @return The DataSegments which include data in the requested interval. These segments may contain data outside the requested interval. + * + * @throws IOException + */ + public List getUsedSegmentsForInterval(final String dataSource, final Interval interval) + throws IOException; +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java index ee8ae08f0d4e..4595c1affbda 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/MetadataStorageUpdaterJobSpec.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Supplier; import io.druid.metadata.MetadataStorageConnectorConfig; +import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.PasswordProvider; import javax.validation.constraints.NotNull; @@ -78,4 +79,20 @@ public String getPassword() } }; } + + //Note: Currently it only supports configured segmentTable, other tables should be added if needed + //by the code using this + public MetadataStorageTablesConfig getMetadataStorageTablesConfig() + { + return new MetadataStorageTablesConfig( + null, + segmentTable, + null, + null, + null, + null, + null, + null + ); + } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java new file mode 100644 index 000000000000..646cd3d9f617 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest.java @@ -0,0 +1,154 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.Granularity; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.path.DatasourcePathSpec; +import io.druid.indexer.path.MultiplePathSpec; +import io.druid.indexer.path.PathSpec; +import io.druid.indexer.path.StaticPathSpec; +import io.druid.indexer.path.UsedSegmentLister; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +/** + */ +public class HadoopIngestionSpecUpdateDatasourcePathSpecSegmentsTest +{ + private final String testDatasource = "test"; + private final Interval testDatasourceInterval = new Interval("1970/2000"); + + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + private final List segments = ImmutableList.of( + new DataSegment( + "test1", + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index1.zip" + ), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 2 + ) + ); + + @Test + public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithNoDatasourcePathSpec() throws Exception + { + PathSpec pathSpec = new StaticPathSpec("/xyz", null); + HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec); + Assert.assertTrue(config.getPathSpec() instanceof StaticPathSpec); + } + + @Test + public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithJustDatasourcePathSpec() throws Exception + { + PathSpec pathSpec = new DatasourcePathSpec( + jsonMapper, + null, + new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null), + null + ); + HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec); + Assert.assertEquals(segments, ((DatasourcePathSpec) config.getPathSpec()).getSegments()); + } + + @Test + public void testupdateSegmentListIfDatasourcePathSpecIsUsedWithMultiplePathSpec() throws Exception + { + PathSpec pathSpec = new MultiplePathSpec( + ImmutableList.of( + new StaticPathSpec("/xyz", null), + new DatasourcePathSpec( + jsonMapper, + null, + new DatasourceIngestionSpec(testDatasource, testDatasourceInterval, null, null, null, null), + null + ) + ) + ); + HadoopDruidIndexerConfig config = testRunUpdateSegmentListIfDatasourcePathSpecIsUsed(pathSpec); + Assert.assertEquals( + segments, + ((DatasourcePathSpec) ((MultiplePathSpec) config.getPathSpec()).getChildren().get(1)).getSegments() + ); + } + + private HadoopDruidIndexerConfig testRunUpdateSegmentListIfDatasourcePathSpecIsUsed( + PathSpec datasourcePathSpec + ) + throws Exception + { + HadoopIngestionSpec spec = new HadoopIngestionSpec( + new DataSchema( + "foo", + null, + new AggregatorFactory[0], + new UniformGranularitySpec( + Granularity.DAY, + null, + ImmutableList.of( + new Interval("2010-01-01/P1D") + ) + ) + ), + new HadoopIOConfig( + jsonMapper.convertValue(datasourcePathSpec, Map.class), + null, + null + ), + null + ); + + spec = jsonMapper.readValue( + jsonMapper.writeValueAsString(spec), + HadoopIngestionSpec.class + ); + + UsedSegmentLister segmentLister = EasyMock.createMock(UsedSegmentLister.class); + EasyMock.expect( + segmentLister.getUsedSegmentsForInterval(testDatasource, testDatasourceInterval) + ).andReturn(segments); + EasyMock.replay(segmentLister); + + spec = HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(spec, jsonMapper, segmentLister); + return HadoopDruidIndexerConfig.fromString(jsonMapper.writeValueAsString(spec)); + } +} diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java index 304006c8de97..cf62f47277e2 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/DatasourcePathSpecTest.java @@ -42,12 +42,10 @@ import io.druid.indexer.HadoopTuningConfig; import io.druid.indexer.hadoop.DatasourceIngestionSpec; import io.druid.indexer.hadoop.DatasourceInputFormat; -import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.initialization.Initialization; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; -import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.server.DruidNode; @@ -67,6 +65,7 @@ public class DatasourcePathSpecTest { private DatasourceIngestionSpec ingestionSpec; + private List segments; public DatasourcePathSpecTest() { @@ -78,13 +77,44 @@ public DatasourcePathSpecTest() null, null ); + + segments = ImmutableList.of( + new DataSegment( + ingestionSpec.getDataSource(), + Interval.parse("2000/3000"), + "ver", + ImmutableMap.of( + "type", "local", + "path", "/tmp/index.zip" + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12334 + ), + new DataSegment( + ingestionSpec.getDataSource(), + Interval.parse("2050/3000"), + "ver", + ImmutableMap.of( + "type", "hdfs", + "path", "/tmp/index.zip" + ), + ImmutableList.of("product"), + ImmutableList.of("visited_sum", "unique_hosts"), + new NoneShardSpec(), + 9, + 12335 + ) + ); } @Test public void testSerde() throws Exception { - final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator = EasyMock.createMock( - IndexerMetadataStorageCoordinator.class + final UsedSegmentLister segmentList = EasyMock.createMock( + UsedSegmentLister.class ); Injector injector = Initialization.makeInjectorWithModules( @@ -95,7 +125,7 @@ public void testSerde() throws Exception @Override public void configure(Binder binder) { - binder.bind(IndexerMetadataStorageCoordinator.class).toInstance(indexerMetadataStorageCoordinator); + binder.bind(UsedSegmentLister.class).toInstance(segmentList); JsonConfigProvider.bindInstance( binder, Key.get(DruidNode.class, Self.class), new DruidNode("dummy-node", null, null) ); @@ -107,8 +137,8 @@ public void configure(Binder binder) ObjectMapper jsonMapper = injector.getInstance(ObjectMapper.class); DatasourcePathSpec expected = new DatasourcePathSpec( - indexerMetadataStorageCoordinator, jsonMapper, + null, ingestionSpec, Long.valueOf(10) ); @@ -116,8 +146,17 @@ public void configure(Binder binder) Assert.assertEquals(expected, actual); expected = new DatasourcePathSpec( - indexerMetadataStorageCoordinator, jsonMapper, + null, + ingestionSpec, + null + ); + actual = jsonMapper.readValue(jsonMapper.writeValueAsString(expected), PathSpec.class); + Assert.assertEquals(expected, actual); + + expected = new DatasourcePathSpec( + jsonMapper, + segments, ingestionSpec, null ); @@ -161,46 +200,13 @@ public void testAddInputPaths() throws Exception ) ); - List segments = ImmutableList.of( - new DataSegment( - ingestionSpec.getDataSource(), - Interval.parse("2000/3000"), - "ver", - ImmutableMap.of( - "type", "local", - "path", "/tmp/index.zip" - ), - ImmutableList.of("product"), - ImmutableList.of("visited_sum", "unique_hosts"), - new NoneShardSpec(), - 9, - 12334 - ), - new DataSegment( - ingestionSpec.getDataSource(), - Interval.parse("2050/3000"), - "ver", - ImmutableMap.of( - "type", "hdfs", - "path", "/tmp/index.zip" - ), - ImmutableList.of("product"), - ImmutableList.of("visited_sum", "unique_hosts"), - new NoneShardSpec(), - 9, - 12335 - ) - ); - IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator = EasyMock.createMock(IndexerMetadataStorageCoordinator.class); - EasyMock.expect(indexerMetadataStorageCoordinator.getUsedSegmentsForInterval(ingestionSpec.getDataSource(), ingestionSpec.getInterval())).andReturn(segments); - EasyMock.replay(indexerMetadataStorageCoordinator); ObjectMapper mapper = new DefaultObjectMapper(); DatasourcePathSpec pathSpec = new DatasourcePathSpec( - indexerMetadataStorageCoordinator, mapper, + segments, ingestionSpec, null ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 08d9c48fa023..e865842ce999 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -17,10 +17,12 @@ package io.druid.indexing.common.task; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -33,17 +35,22 @@ import io.druid.indexer.HadoopIngestionSpec; import io.druid.indexer.Jobby; import io.druid.indexer.MetadataStorageUpdaterJobHandler; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; import io.druid.indexing.common.actions.LockTryAcquireAction; +import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.SortedSet; public class HadoopIndexTask extends HadoopTask @@ -56,10 +63,14 @@ private static String getTheDataSource(HadoopIngestionSpec spec) } @JsonIgnore - private final HadoopIngestionSpec spec; + private HadoopIngestionSpec spec; + @JsonIgnore private final String classpathPrefix; + @JsonIgnore + private final ObjectMapper jsonMapper; + /** * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters * for creating Druid index segments. It may be modified. @@ -76,7 +87,8 @@ public HadoopIndexTask( @JsonProperty("spec") HadoopIngestionSpec spec, @JsonProperty("hadoopCoordinates") String hadoopCoordinates, @JsonProperty("hadoopDependencyCoordinates") List hadoopDependencyCoordinates, - @JsonProperty("classpathPrefix") String classpathPrefix + @JsonProperty("classpathPrefix") String classpathPrefix, + @JacksonInject ObjectMapper jsonMapper ) { super( @@ -102,6 +114,7 @@ public HadoopIndexTask( ); this.classpathPrefix = classpathPrefix; + this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMappper"); } @Override @@ -152,6 +165,11 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception final ClassLoader loader = buildClassLoader(toolbox); boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); + spec = HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( + spec, + jsonMapper, + new OverlordActionBasedUsedSegmentLister(toolbox)); + final String config = invokeForeignLoader( "io.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessing", new String[]{ diff --git a/indexing-service/src/main/java/io/druid/indexing/hadoop/OverlordActionBasedUsedSegmentLister.java b/indexing-service/src/main/java/io/druid/indexing/hadoop/OverlordActionBasedUsedSegmentLister.java new file mode 100644 index 000000000000..723d927aeac5 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/hadoop/OverlordActionBasedUsedSegmentLister.java @@ -0,0 +1,54 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.indexing.hadoop; + +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.inject.Inject; +import io.druid.indexer.path.UsedSegmentLister; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentListUsedAction; +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.io.IOException; +import java.util.List; + +/** + */ +public class OverlordActionBasedUsedSegmentLister implements UsedSegmentLister +{ + private final TaskToolbox toolbox; + + @Inject + public OverlordActionBasedUsedSegmentLister(TaskToolbox toolbox) + { + this.toolbox = Preconditions.checkNotNull(toolbox, "null task toolbox"); + } + + @Override + public List getUsedSegmentsForInterval( + String dataSource, Interval interval + ) throws IOException + { + return toolbox + .getTaskActionClient() + .submit(new SegmentListUsedAction(dataSource, interval)); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 52b41685573c..a406ef39a7e4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -471,11 +471,15 @@ public void testHadoopIndexTaskSerde() throws Exception ), null, null, - "blah" + "blah", + jsonMapper ); final String json = jsonMapper.writeValueAsString(task); - final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class); + + InjectableValues inject = new InjectableValues.Std() + .addValue(ObjectMapper.class, jsonMapper); + final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.reader(Task.class).with(inject).readValue(json); Assert.assertEquals("foo", task.getDataSource()); diff --git a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java index 03a28f02d13b..d7d35a33aac9 100644 --- a/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java +++ b/services/src/main/java/io/druid/cli/CliInternalHadoopIndexer.java @@ -30,14 +30,24 @@ import com.metamx.common.logger.Logger; import io.airlift.command.Arguments; import io.airlift.command.Command; +import io.druid.guice.LazySingleton; import io.druid.indexer.HadoopDruidDetermineConfigurationJob; import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerJob; +import io.druid.indexer.HadoopIngestionSpec; import io.druid.indexer.JobHelper; import io.druid.indexer.Jobby; import io.druid.indexer.MetadataStorageUpdaterJobHandler; +import io.druid.indexer.hadoop.DatasourceIngestionSpec; +import io.druid.indexer.path.DatasourcePathSpec; +import io.druid.indexer.path.MetadataStoreBasedUsedSegmentLister; +import io.druid.indexer.path.MultiplePathSpec; +import io.druid.indexer.path.PathSpec; import io.druid.indexer.updater.MetadataStorageUpdaterJobSpec; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; import io.druid.metadata.MetadataStorageConnectorConfig; +import io.druid.metadata.MetadataStorageTablesConfig; import java.io.File; import java.net.URI; @@ -84,6 +94,10 @@ public void configure(Binder binder) binder.bind(new TypeLiteral>() {}) .toInstance(metadataSpec); + binder.bind(MetadataStorageTablesConfig.class).toInstance(metadataSpec.getMetadataStorageTablesConfig()); + binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in( + LazySingleton.class + ); } } ); @@ -95,11 +109,23 @@ public void run() try { Injector injector = makeInjector(); - MetadataStorageUpdaterJobSpec metadataSpec = getHadoopDruidIndexerConfig().getSchema().getIOConfig().getMetadataUpdateSpec(); + config = getHadoopDruidIndexerConfig(); + + MetadataStorageUpdaterJobSpec metadataSpec = config.getSchema().getIOConfig().getMetadataUpdateSpec(); // override metadata storage type based on HadoopIOConfig Preconditions.checkNotNull(metadataSpec.getType(), "type in metadataUpdateSpec must not be null"); injector.getInstance(Properties.class).setProperty("druid.metadata.storage.type", metadataSpec.getType()); + config = HadoopDruidIndexerConfig.fromSpec( + HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( + config.getSchema(), + HadoopDruidIndexerConfig.jsonMapper, + new MetadataStoreBasedUsedSegmentLister( + injector.getInstance(IndexerMetadataStorageCoordinator.class) + ) + ) + ); + List jobs = Lists.newArrayList(); jobs.add(new HadoopDruidDetermineConfigurationJob(config)); jobs.add(new HadoopDruidIndexerJob(config, injector.getInstance(MetadataStorageUpdaterJobHandler.class))); From a3bab5b7d96f2f81da02da3158504232c6906826 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 12 Aug 2015 17:33:09 -0500 Subject: [PATCH 6/7] IndexGeneratorJobTest type unit test for batch delta ingestion and reindexing --- .../indexer/BatchDeltaIngestionTest.java | 359 ++++++++++++++++++ 1 file changed, 359 insertions(+) create mode 100644 indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java new file mode 100644 index 000000000000..4272375af9e3 --- /dev/null +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -0,0 +1,359 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexer; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import com.metamx.common.Granularity; +import io.druid.data.input.Firehose; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.CSVParseSpec; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularity; +import io.druid.indexer.path.DatasourcePathSpec; +import io.druid.indexer.path.UsedSegmentLister; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.segment.IndexIO; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexStorageAdapter; +import io.druid.segment.StorageAdapter; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.loading.LocalDataSegmentPuller; +import io.druid.segment.realtime.firehose.IngestSegmentFirehose; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.HashBasedNumberedShardSpec; +import org.apache.commons.io.FileUtils; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class BatchDeltaIngestionTest +{ + public final + @Rule + TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private ObjectMapper mapper; + private Interval interval; + private List segments; + + public BatchDeltaIngestionTest() throws IOException + { + mapper = new DefaultObjectMapper(); + mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed")); + InjectableValues inject = new InjectableValues.Std().addValue(ObjectMapper.class, mapper); + mapper.setInjectableValues(inject); + + this.interval = new Interval("2014-10-22T00:00:00Z/P1D"); + segments = ImmutableList.of( + new DefaultObjectMapper() + .readValue( + this.getClass().getClassLoader().getResource("test-segment/descriptor.json"), + DataSegment.class + ) + .withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath() + ) + ) + ); + } + + @Test + public void testReindexing() throws Exception + { + HadoopDruidIndexerConfig config = makeHadoopDruidIndexerConfig( + ImmutableMap.of( + "type", + "dataSource", + "ingestionSpec", + ImmutableMap.of( + "dataSource", + "xyz", + "interval", + interval + ), + "segments", + segments + ), + temporaryFolder.newFolder() + ); + + List> expectedRows = ImmutableList.of( + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T00:00:00.000Z"), + "host", ImmutableList.of("a.example.com"), + "visited_sum", 100L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T01:00:00.000Z"), + "host", ImmutableList.of("b.example.com"), + "visited_sum", 150L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T02:00:00.000Z"), + "host", ImmutableList.of("c.example.com"), + "visited_sum", 200L, + "unique_hosts", 1.0d + ) + ); + + testIngestion(config, expectedRows); + } + + @Test + public void testDeltaIngestion() throws Exception + { + File dataFile = temporaryFolder.newFile(); + FileUtils.writeLines( + dataFile, + ImmutableList.of( + "2014102200,a.example.com,a.example.com,90", + "2014102201,b.example.com,b.example.com,25", + "2014102202,c.example.com,c.example.com,70" + ) + ); + + HadoopDruidIndexerConfig config = makeHadoopDruidIndexerConfig( + ImmutableMap.of( + "type", + "multi", + "children", + ImmutableList.of( + ImmutableMap.of( + "type", + "dataSource", + "ingestionSpec", + ImmutableMap.of( + "dataSource", + "xyz", + "interval", + interval + ), + "segments", + segments + ), + ImmutableMap.of( + "type", + "static", + "paths", + dataFile.getCanonicalPath() + ) + ) + ), + temporaryFolder.newFolder() + ); + + List> expectedRows = ImmutableList.of( + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T00:00:00.000Z"), + "host", ImmutableList.of("a.example.com"), + "visited_sum", 190L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T01:00:00.000Z"), + "host", ImmutableList.of("b.example.com"), + "visited_sum", 175L, + "unique_hosts", 1.0d + ), + ImmutableMap.of( + "time", DateTime.parse("2014-10-22T02:00:00.000Z"), + "host", ImmutableList.of("c.example.com"), + "visited_sum", 270L, + "unique_hosts", 1.0d + ) + ); + + testIngestion(config, expectedRows); + } + + private void testIngestion(HadoopDruidIndexerConfig config, List> expectedRowsGenerated) + throws Exception + { + IndexGeneratorJob job = new LegacyIndexGeneratorJob(config); + JobHelper.runJobs(ImmutableList.of(job), config); + + File segmentFolder = new File( + String.format( + "%s/%s/%s_%s/%s/0", + config.getSchema().getIOConfig().getSegmentOutputPath(), + config.getSchema().getDataSchema().getDataSource(), + interval.getStart().toString(), + interval.getEnd().toString(), + config.getSchema().getTuningConfig().getVersion() + ) + ); + + Assert.assertTrue(segmentFolder.exists()); + + File descriptor = new File(segmentFolder, "descriptor.json"); + File indexZip = new File(segmentFolder, "index.zip"); + Assert.assertTrue(descriptor.exists()); + Assert.assertTrue(indexZip.exists()); + + DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class); + Assert.assertEquals("website", dataSegment.getDataSource()); + Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion()); + Assert.assertEquals(interval, dataSegment.getInterval()); + Assert.assertEquals("local", dataSegment.getLoadSpec().get("type")); + Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path")); + Assert.assertEquals("host", dataSegment.getDimensions().get(0)); + Assert.assertEquals("visited_sum", dataSegment.getMetrics().get(0)); + Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1)); + Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion()); + + HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec(); + Assert.assertEquals(0, spec.getPartitionNum()); + Assert.assertEquals(1, spec.getPartitions()); + + File tmpUnzippedSegmentDir = temporaryFolder.newFolder(); + new LocalDataSegmentPuller().getSegmentFiles(dataSegment, tmpUnzippedSegmentDir); + + QueryableIndex index = IndexIO.loadIndex(tmpUnzippedSegmentDir); + StorageAdapter adapter = new QueryableIndexStorageAdapter(index); + + Firehose firehose = new IngestSegmentFirehose( + ImmutableList.of(adapter), + ImmutableList.of("host"), + ImmutableList.of("visited_sum", "unique_hosts"), + null, + interval, + QueryGranularity.NONE + ); + + List rows = Lists.newArrayList(); + while (firehose.hasMore()) { + rows.add(firehose.nextRow()); + } + + verifyRows(expectedRowsGenerated, rows); + } + + private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map inputSpec, File tmpDir) + throws Exception + { + HadoopDruidIndexerConfig config = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + "website", + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(ImmutableList.of("host"), null, null), + null, + ImmutableList.of("timestamp", "host", "host2", "visited_num") + ) + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("visited_sum", "visited_num"), + new HyperUniquesAggregatorFactory("unique_hosts", "host2") + }, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval) + ) + ), + new HadoopIOConfig( + inputSpec, + null, + tmpDir.getCanonicalPath() + ), + new HadoopTuningConfig( + tmpDir.getCanonicalPath(), + null, + null, + null, + null, + null, + false, + false, + false, + false, + null, + false, + false, + false, + null, + null, + false + ) + ) + ); + + config.setShardSpecs( + ImmutableMap.>of( + interval.getStart(), + ImmutableList.of( + new HadoopyShardSpec( + new HashBasedNumberedShardSpec(0, 1, HadoopDruidIndexerConfig.jsonMapper), + 0 + ) + ) + ) + ); + config = HadoopDruidIndexerConfig.fromSpec(config.getSchema()); + return config; + } + + private void verifyRows(List> expectedRows, List actualRows) + { + Assert.assertEquals(expectedRows.size(), actualRows.size()); + + for (int i = 0; i < expectedRows.size(); i++) { + Map expected = expectedRows.get(i); + InputRow actual = actualRows.get(i); + + Assert.assertEquals(ImmutableList.of("host"), actual.getDimensions()); + + Assert.assertEquals(expected.get("time"), actual.getTimestamp()); + Assert.assertEquals(expected.get("host"), actual.getDimension("host")); + Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum")); + Assert.assertEquals( + (Double) expected.get("unique_hosts"), + (Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")), + 0.001 + ); + } + } +} From cfd81bfac712a07d1bf39e6fe39157bb712b8e8b Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 22 Jul 2015 13:49:57 -0500 Subject: [PATCH 7/7] updating the docs on how to do hadoop batch re-ingesion and delta ingestion --- docs/content/ingestion/batch-ingestion.md | 59 ++++++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/docs/content/ingestion/batch-ingestion.md b/docs/content/ingestion/batch-ingestion.md index 35a671c67692..4af56f2f21d5 100644 --- a/docs/content/ingestion/batch-ingestion.md +++ b/docs/content/ingestion/batch-ingestion.md @@ -136,7 +136,7 @@ There are multiple types of inputSpecs: ##### `static` -Is a type of data loader where a static path to where the data files are located is passed. +Is a type of inputSpec where a static path to where the data files are located is passed. |Field|Type|Description|Required| |-----|----|-----------|--------| @@ -150,7 +150,7 @@ For example, using the static input paths: ##### `granularity` -Is a type of data loader that expects data to be laid out in a specific path format. Specifically, it expects it to be segregated by day in this directory format `y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX` (dates are represented by lowercase, time is represented by uppercase). +Is a type of inputSpec that expects data to be laid out in a specific path format. Specifically, it expects it to be segregated by day in this directory format `y=XXXX/m=XX/d=XX/H=XX/M=XX/S=XX` (dates are represented by lowercase, time is represented by uppercase). |Field|Type|Description|Required| |-----|----|-----------|--------| @@ -166,6 +166,61 @@ s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=01 ... s3n://billy-bucket/the/data/is/here/y=2012/m=06/d=01/H=23 ``` +##### `dataSource` + +It is a type of inputSpec that reads data already stored inside druid. It is useful for doing "re-indexing". A usecase would be that you ingested some data in some interval and at a later time you wanted to change granularity of rows or remove some columns from the data stored in druid. + +|Field|Type|Description|Required| +|-----|----|-----------|--------| +|ingestionSpec|Json Object|Specification of druid segments to be loaded. See below.|yes| +|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. Default is none. |no| + +Here is what goes inside "ingestionSpec" +|Field|Type|Description|Required| +|dataSource|String|Druid dataSource name from which you are loading the data.|yes| +|interval|String|A string representing ISO-8601 Intervals.|yes| +|granularity|String|Defines the granularity of the query while loading data. Default value is "none".See [Granularities](../querying/granularities.html).|no| +|filter|Json|See [Filters](../querying/filters.html)|no| +|dimensions|Array of String|Name of dimension columns to load. By default, the list will be constructed from parseSpec. If parseSpec does not have explicit list of dimensions then all the dimension columns present in stored data will be read.|no| +|metrics|Array of String|Name of metric columns to load. By default, the list will be constructed from the "name" of all the configured aggregators.|no| + + +For example + +``` +"ingestionSpec" : + { + "dataSource": "wikipedia", + "interval": "2014-10-20T00:00:00Z/P2W" + } +``` + +##### `multi` + +It is a composing inputSpec to combine two other input specs. It is useful for doing "delta ingestion". A usecase would be that you ingested some data in some interval and at a later time you wanted to "append" more data to that interval. You can use this inputSpec to combine `dataSource` and `static` (or others) input specs to add more data to an already indexed interval. + +|Field|Type|Description|Required| +|-----|----|-----------|--------| +|children|Array of Json Objects|List of json objects containing other inputSpecs |yes| + +For example + +``` +"children": [ + { + "type" : "dataSource", + "ingestionSpec" : { + "dataSource": "wikipedia", + "interval": "2014-10-20T00:00:00Z/P2W" + } + }, + { + "type" : "static", + "paths": "/path/to/more/wikipedia/data/" + } +] +``` + #### Metadata Update Job Spec