diff --git a/common/src/main/java/io/druid/timeline/TimelineObjectHolder.java b/common/src/main/java/io/druid/timeline/TimelineObjectHolder.java index de7dcd28c43d..ecb16ceaf06f 100644 --- a/common/src/main/java/io/druid/timeline/TimelineObjectHolder.java +++ b/common/src/main/java/io/druid/timeline/TimelineObjectHolder.java @@ -28,16 +28,19 @@ public class TimelineObjectHolder implements LogicalSeg { private final Interval interval; private final VersionType version; + private final long approxSize; private final PartitionHolder object; public TimelineObjectHolder( Interval interval, VersionType version, + long approxSize, PartitionHolder object ) { this.interval = interval; this.version = version; + this.approxSize = approxSize; this.object = object; } @@ -52,6 +55,11 @@ public VersionType getVersion() return version; } + public long getApproximatedSize() + { + return approxSize; + } + public PartitionHolder getObject() { return object; diff --git a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java index 19219f59d160..a6ab3b8cb5f3 100644 --- a/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java +++ b/common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java @@ -41,17 +41,17 @@ /** * VersionedIntervalTimeline is a data structure that manages objects on a specific timeline. - * + *

* It associates a jodatime Interval and a generically-typed version with the object that is being stored. - * + *

* In the event of overlapping timeline entries, timeline intervals may be chunked. The underlying data associated * with a timeline entry remains unchanged when chunking occurs. - * + *

* After loading objects via the add() method, the lookup(Interval) method can be used to get the list of the most * recent objects (according to the version) that match the given interval. The intent is that objects represent * a certain time period and when you do a lookup(), you are asking for all of the objects that you need to look * at in order to get a correct answer about that time period. - * + *

* The findOvershadowed() method returns a list of objects that will never be returned by a call to lookup() because * they are overshadowed by some other object. This can be used in conjunction with the add() and remove() methods * to achieve "atomic" updates. First add new items, then check if those items caused anything to be overshadowed, if @@ -81,6 +81,11 @@ public VersionedIntervalTimeline( } public void add(final Interval interval, VersionType version, PartitionChunk object) + { + add(interval, version, object, -1); + } + + public void add(final Interval interval, VersionType version, PartitionChunk object, long approxSize) { try { lock.writeLock().lock(); @@ -89,7 +94,7 @@ public void add(final Interval interval, VersionType version, PartitionChunk(object)); + entry = new TimelineEntry(interval, version, new PartitionHolder(object), approxSize); TreeMap versionEntry = new TreeMap(versionComparator); versionEntry.put(version, entry); allTimelineEntries.put(interval, versionEntry); @@ -97,7 +102,7 @@ public void add(final Interval interval, VersionType version, PartitionChunk(object)); + entry = new TimelineEntry(interval, version, new PartitionHolder(object), approxSize); exists.put(version, entry); } else { PartitionHolder partitionHolder = entry.getPartitionHolder(); @@ -179,7 +184,7 @@ public PartitionHolder findEntry(Interval interval, VersionType vers * @param interval interval to find objects for * * @return Holders representing the interval that the objects exist for, PartitionHolders - * are guaranteed to be complete + * are guaranteed to be complete */ public List> lookup(Interval interval) { @@ -244,6 +249,7 @@ public Set> findOvershadowed() new TimelineObjectHolder( object.getTrueInterval(), object.getVersion(), + object.getApproximatedSize(), object.getPartitionHolder() ) ); @@ -293,10 +299,10 @@ private void add( } /** - * * @param timeline * @param key * @param entry + * * @return boolean flag indicating whether or not we inserted or discarded something */ private boolean addAtKey( @@ -446,6 +452,7 @@ private List> lookup(Interval inte new TimelineObjectHolder( timelineInterval, val.getVersion(), + val.getApproximatedSize(), val.getPartitionHolder() ) ); @@ -464,6 +471,7 @@ private List> lookup(Interval inte new TimelineObjectHolder( new Interval(interval.getStart(), firstEntry.getInterval().getEnd()), firstEntry.getVersion(), + firstEntry.getApproximatedSize(), firstEntry.getObject() ) ); @@ -476,6 +484,7 @@ private List> lookup(Interval inte new TimelineObjectHolder( new Interval(lastEntry.getInterval().getStart(), interval.getEnd()), lastEntry.getVersion(), + lastEntry.getApproximatedSize(), lastEntry.getObject() ) ); @@ -489,12 +498,19 @@ public class TimelineEntry private final Interval trueInterval; private final VersionType version; private final PartitionHolder partitionHolder; - - public TimelineEntry(Interval trueInterval, VersionType version, PartitionHolder partitionHolder) + private final long approxSize; + + public TimelineEntry( + Interval trueInterval, + VersionType version, + PartitionHolder partitionHolder, + long approxSize + ) { this.trueInterval = trueInterval; this.version = version; this.partitionHolder = partitionHolder; + this.approxSize = approxSize; } public Interval getTrueInterval() @@ -511,5 +527,10 @@ public PartitionHolder getPartitionHolder() { return partitionHolder; } + + public long getApproximatedSize() + { + return approxSize; + } } } diff --git a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java index bc88e39d9050..0bcebf0b83bb 100644 --- a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java +++ b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java @@ -1552,7 +1552,7 @@ private void add(String interval, String version, PartitionChunk value) private void add(Interval interval, String version, PartitionChunk value) { - timeline.add(interval, version, value); + timeline.add(interval, version, value, -1); } private void assertValues( diff --git a/extensions-core/hive-extensions/pom.xml b/extensions-core/hive-extensions/pom.xml new file mode 100644 index 000000000000..6ebb8628ded2 --- /dev/null +++ b/extensions-core/hive-extensions/pom.xml @@ -0,0 +1,197 @@ + + + + + 4.0.0 + + io.druid.extensions + druid-hive-extensions + druid-hive-extensions + druid-hive-extensions + + + io.druid + druid + 0.9.1-SNAPSHOT + ../../pom.xml + + + + + io.druid + druid-api + ${project.parent.version} + provided + + + io.druid + druid-common + ${project.parent.version} + provided + + + io.druid + druid-processing + ${project.parent.version} + provided + + + io.druid + druid-indexing-hadoop + ${project.parent.version} + provided + + + org.apache.hive + hive-exec + 2.0.0 + + + + org.apache.hadoop + hadoop-client + compile + + + commons-cli + commons-cli + + + commons-httpclient + commons-httpclient + + + log4j + log4j + + + commons-codec + commons-codec + + + commons-logging + commons-logging + + + commons-io + commons-io + + + commons-lang + commons-lang + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + javax.ws.rs + jsr311-api + + + com.google.code.findbugs + jsr305 + + + org.mortbay.jetty + jetty-util + + + javax.activation + activation + + + com.google.protobuf + protobuf-java + + + com.sun.jersey + jersey-core + + + + + com.metamx + emitter + provided + + + commons-io + commons-io + provided + + + + + junit + junit + test + + + io.druid + druid-server + ${project.parent.version} + test + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.compile.version} + tests + test + + + org.apache.hadoop + hadoop-common + ${hadoop.compile.version} + tests + test + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.compile.version} + test + + + + diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveInputFormat.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveInputFormat.java new file mode 100644 index 000000000000..16644680ebc3 --- /dev/null +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveInputFormat.java @@ -0,0 +1,161 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Functions; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import io.druid.indexer.hadoop.QueryBasedInputFormat; +import io.druid.query.filter.AndDimFilter; +import io.druid.query.filter.DimFilter; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.Progressable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class DruidHiveInputFormat extends QueryBasedInputFormat implements HiveOutputFormat +{ + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException + { + DruidInputSplit[] splits = getInputSplits(job); + + String input = job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, ""); + String[] dirs = org.apache.hadoop.util.StringUtils.split(input); + if (dirs.length == 0) { + throw new IllegalStateException("input dir is null"); + } + Path path = new Path(dirs[0]); + InputSplit[] converted = new InputSplit[splits.length]; + for (int i = 0; i < converted.length; i++) { + converted[i] = new InputSplitWrapper(path, splits[i]); + } + return converted; + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException + { + DruidRecordReader reader = new DruidRecordReader(); + reader.initialize(((InputSplitWrapper) split).druidSplit, job); + return reader; + } + + @Override + protected final Configuration configure(Configuration configuration, ObjectMapper mapper) + throws IOException + { + Map types = ExpressionConverter.getColumnTypes(configuration); + Map> converted = ExpressionConverter.convert(configuration, types); + List timeRanges = converted.remove(ExpressionConverter.TIME_COLUMN_NAME); + if (timeRanges == null || timeRanges.isEmpty()) { + throw new IllegalArgumentException("failed to extract intervals from predicate"); + } + configuration.set( + CONF_DRUID_INTERVALS, + StringUtils.join(Lists.transform(ExpressionConverter.toInterval(timeRanges), Functions.toStringFunction()), ",") + ); + + List filters = Lists.newArrayList(); + for (Map.Entry> entry : converted.entrySet()) { + if (types.get(entry.getKey()).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) { + DimFilter filter = ExpressionConverter.toFilter(entry.getKey(), entry.getValue()); + if (filter != null) { + filters.add(filter); + } + } + } + if (!filters.isEmpty()) { + configuration.set(CONF_DRUID_FILTERS, mapper.writeValueAsString(new AndDimFilter(filters).optimize())); + } + return configuration; + } + + @Override + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) + throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public FileSinkOperator.RecordWriter getHiveRecordWriter( + JobConf jc, + Path finalOutPath, + Class valueClass, + boolean isCompressed, + Properties tableProperties, + Progressable progress + ) throws IOException + { + throw new UnsupportedOperationException(); + } + + public static class InputSplitWrapper extends FileSplit + { + private DruidInputSplit druidSplit; + + public InputSplitWrapper() {} + + public InputSplitWrapper(Path path, DruidInputSplit druidSplit) + { + super(path, 0, 0, druidSplit.getLocations()); + this.druidSplit = druidSplit; + } + + public void write(DataOutput out) throws IOException + { + super.write(out); + druidSplit.write(out); + } + + public void readFields(DataInput in) throws IOException + { + super.readFields(in); + this.druidSplit = new DruidInputSplit(); + this.druidSplit.readFields(in); + } + } +} \ No newline at end of file diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveSerDe.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveSerDe.java new file mode 100644 index 000000000000..52b40375c2e7 --- /dev/null +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveSerDe.java @@ -0,0 +1,135 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import com.google.common.collect.Lists; +import com.metamx.common.logger.Logger; +import io.druid.indexer.hadoop.MapWritable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.Writable; +import org.joda.time.DateTime; + +import java.sql.Timestamp; +import java.util.Date; +import java.util.List; +import java.util.Properties; + +/** + */ +public class DruidHiveSerDe extends AbstractSerDe +{ + private static final Logger logger = new Logger(DruidHiveSerDe.class); + + private String[] columns; + private ObjectInspector inspector; + + private int timeIndex = -1; + private PrimitiveObjectInspector.PrimitiveCategory timeConvert; + + @Override + public void initialize(Configuration configuration, Properties properties) throws SerDeException + { + LazySerDeParameters serdeParams = new LazySerDeParameters(configuration, properties, getClass().getName()); + + List columnNames = serdeParams.getColumnNames(); + List columnTypes = serdeParams.getColumnTypes(); + + List inspectors = Lists.newArrayListWithExpectedSize(columnNames.size()); + for (int i = 0; i < columnTypes.size(); ++i) { + PrimitiveTypeInfo typeInfo = (PrimitiveTypeInfo) columnTypes.get(i); + inspectors.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(typeInfo)); + if (typeInfo.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING + && ExpressionConverter.TIME_COLUMN_NAME.equals(columnNames.get(i))) { + timeConvert = typeInfo.getPrimitiveCategory(); + timeIndex = i; + } + } + if (timeConvert != null && + timeConvert != PrimitiveObjectInspector.PrimitiveCategory.LONG && + timeConvert != PrimitiveObjectInspector.PrimitiveCategory.DATE && + timeConvert != PrimitiveObjectInspector.PrimitiveCategory.TIMESTAMP) { + logger.warn("Not supported time conversion type " + timeConvert + ".. regarding to string"); + inspectors.set(timeIndex, PrimitiveObjectInspectorFactory.javaStringObjectInspector); + timeIndex = -1; + } + inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, inspectors); + columns = columnNames.toArray(new String[columnNames.size()]); + } + + @Override + public Class getSerializedClass() + { + throw new UnsupportedOperationException(); + } + + @Override + public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException + { + throw new UnsupportedOperationException(); + } + + @Override + public SerDeStats getSerDeStats() + { + return new SerDeStats(); + } + + @Override + public Object deserialize(Writable writable) throws SerDeException + { + MapWritable input = (MapWritable) writable; + List output = Lists.newArrayListWithExpectedSize(columns.length); + for (int i = 0; i < columns.length; i++) { + Object v = input.getValue().get(columns[i]); + if (v != null && i == timeIndex) { + long timeMillis = new DateTime(v).getMillis(); + switch (timeConvert) { + case LONG: + v = timeMillis; + break; + case DATE: + v = new Date(timeMillis); + break; + case TIMESTAMP: + v = new Timestamp(timeMillis); + break; + } + } + output.add(v); + } + return output; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException + { + return inspector; + } +} diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveStorageHandler.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveStorageHandler.java new file mode 100644 index 000000000000..c929eff5f134 --- /dev/null +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/DruidHiveStorageHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; + +/** + */ +public class DruidHiveStorageHandler extends DefaultStorageHandler +{ + @Override + public Class getInputFormatClass() + { + return DruidHiveInputFormat.class; + } + + @Override + public Class getOutputFormatClass() + { + return DruidHiveInputFormat.class; + } + + @Override + public Class getSerDeClass() + { + return DruidHiveSerDe.class; + } +} diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java new file mode 100644 index 000000000000..b1885a4a867c --- /dev/null +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/ExpressionConverter.java @@ -0,0 +1,433 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import com.google.common.base.Function; +import com.google.common.collect.BoundType; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; +import com.metamx.common.logger.Logger; +import io.druid.common.utils.JodaUtils; +import io.druid.query.filter.BoundDimFilter; +import io.druid.query.filter.DimFilter; +import io.druid.query.filter.InDimFilter; +import io.druid.query.filter.OrDimFilter; +import io.druid.query.filter.SelectorDimFilter; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.joda.time.Interval; + +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.ParseException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + */ +public class ExpressionConverter +{ + private static final Logger logger = new Logger(ExpressionConverter.class); + + public static final String TIME_COLUMN_NAME = "__time"; + + static Map> convert(Configuration configuration, Map types) + { + String filterExprSerialized = configuration.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (filterExprSerialized == null) { + logger.info("No predicate is pushed down"); + return Collections.emptyMap(); + } + return getRanges(SerializationUtilities.deserializeExpression(filterExprSerialized), types); + } + + public static List toInterval(List ranges) + { + List intervals = Lists.transform( + ranges, new Function() + { + @Override + public Interval apply(Range range) + { + long start = range.hasLowerBound() ? toLong(range.lowerEndpoint()) : JodaUtils.MIN_INSTANT; + long end = range.hasUpperBound() ? toLong(range.upperEndpoint()) : JodaUtils.MAX_INSTANT; + if (range.hasLowerBound() && range.lowerBoundType() == BoundType.OPEN) { + start++; + } + if (range.hasUpperBound() && range.upperBoundType() == BoundType.CLOSED) { + end++; + } + return new Interval(start, end); + } + } + ); + logger.info("Converted time ranges %s to interval %s", ranges, intervals); + return intervals; + } + + // should be string type + public static DimFilter toFilter(String dimension, List ranges) + { + Iterable filtered = Iterables.filter(ranges, Ranges.VALID); + List equalValues = Lists.newArrayList(); + List dimFilters = Lists.newArrayList(); + for (Range range : filtered) { + String lower = range.hasLowerBound() ? (String) range.lowerEndpoint() : null; + String upper = range.hasUpperBound() ? (String) range.upperEndpoint() : null; + if (lower == null && upper == null) { + return null; + } + if (Objects.equals(lower, upper)) { + equalValues.add(lower); + continue; + } + boolean lowerStrict = range.hasLowerBound() && range.lowerBoundType() == BoundType.OPEN; + boolean upperStrict = range.hasUpperBound() && range.upperBoundType() == BoundType.OPEN; + dimFilters.add(new BoundDimFilter(dimension, lower, upper, lowerStrict, upperStrict, false, null)); + } + if (equalValues.size() > 1) { + dimFilters.add(new InDimFilter(dimension, equalValues, null)); + } else if (equalValues.size() == 1) { + dimFilters.add(new SelectorDimFilter(dimension, equalValues.get(0), null)); + } + DimFilter dimFilter = new OrDimFilter(dimFilters).optimize(); + logger.info("Converted dimension '%s' ranges %s to filter %s", dimension, ranges, dimFilter); + return dimFilter; + } + + public static Map getColumnTypes(Configuration configuration) + { + String[] colNames = configuration.getStrings(serdeConstants.LIST_COLUMNS); + String[] colTypes = configuration.getStrings(serdeConstants.LIST_COLUMN_TYPES); + Set projections = Sets.newHashSet(ColumnProjectionUtils.getReadColumnIDs(configuration)); + if (colNames == null || colTypes == null) { + return ImmutableMap.of(); + } + Map typeMap = Maps.newHashMap(); + for (int i = 0; i < colTypes.length; i++) { + if (!projections.isEmpty() && !projections.contains(i)) { + continue; + } + String colName = colNames[i].trim(); + PrimitiveTypeInfo typeInfo = TypeInfoFactory.getPrimitiveTypeInfo(colTypes[i]); + if (colName.equals(ExpressionConverter.TIME_COLUMN_NAME) && + typeInfo.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.LONG && + typeInfo.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.TIMESTAMP) { + logger.warn("time column should be defined as bigint or timestamp type"); + } + typeMap.put(colName, typeInfo); + } + return typeMap; + } + + static Map> getRanges(ExprNodeGenericFuncDesc filterExpr, Map types) + { + logger.info("Start analyzing predicate " + filterExpr.getExprString()); + SearchArgument searchArgument = ConvertAstToSearchArg.create(filterExpr); + ExpressionTree root = searchArgument.getExpression(); + + List leaves = Lists.newArrayList(searchArgument.getLeaves()); + + Map> rangeMap = Maps.newHashMap(); + if (root.getOperator() == ExpressionTree.Operator.AND) { + for (ExpressionTree child : root.getChildren()) { + String extracted = extractSoleColumn(child, leaves); + if (extracted == null) { + continue; + } + PrimitiveTypeInfo type = types.get(extracted); + List ranges = extractRanges(type, child, leaves, false); + if (ranges == null) { + continue; + } + if (rangeMap.get(extracted) == null) { + rangeMap.put(extracted, ranges); + continue; + } + // (a or b) and (c or d) -> (a and c) or (b and c) or (a and d) or (b and d) + List overlapped = Lists.newArrayList(); + for (Range current : rangeMap.get(extracted)) { + for (Range interval : ranges) { + if (current.isConnected(interval)) { + overlapped.add(current.intersection(interval)); + } + } + rangeMap.put(extracted, overlapped); + } + } + } else { + String extracted = extractSoleColumn(root, leaves); + if (extracted != null) { + PrimitiveTypeInfo type = types.get(extracted); + List ranges = extractRanges(type, root, leaves, false); + if (ranges != null) { + rangeMap.put(extracted, ranges); + } + } + } + + Map> rangesMap = Maps.transformValues(rangeMap, Ranges.COMPACT); + for (Map.Entry> entry : rangesMap.entrySet()) { + logger.info(">> " + entry); + } + return rangesMap; + } + + private static String extractSoleColumn(ExpressionTree tree, List leaves) + { + if (tree.getOperator() == ExpressionTree.Operator.LEAF) { + return leaves.get(tree.getLeaf()).getColumnName(); + } + String current = null; + List children = tree.getChildren(); + if (children != null && !children.isEmpty()) { + for (ExpressionTree child : children) { + String resolved = extractSoleColumn(child, leaves); + if (current != null && !current.equals(resolved)) { + return null; + } + current = resolved; + } + } + return current; + } + + private static List extractRanges( + PrimitiveTypeInfo type, + ExpressionTree tree, + List leaves, + boolean withNot + ) + { + if (tree.getOperator() == ExpressionTree.Operator.NOT) { + return extractRanges(type, tree.getChildren().get(0), leaves, !withNot); + } + if (tree.getOperator() == ExpressionTree.Operator.LEAF) { + return leafToRanges(type, leaves.get(tree.getLeaf()), withNot); + } + if (tree.getOperator() == ExpressionTree.Operator.OR) { + List intervals = Lists.newArrayList(); + for (ExpressionTree child : tree.getChildren()) { + List extracted = extractRanges(type, child, leaves, withNot); + if (extracted != null) { + intervals.addAll(extracted); + } + } + return intervals; + } + return null; + } + + private static List leafToRanges(PrimitiveTypeInfo type, PredicateLeaf hiveLeaf, boolean withNot) + { + PredicateLeaf.Operator operator = hiveLeaf.getOperator(); + switch (operator) { + case LESS_THAN: + case LESS_THAN_EQUALS: + case EQUALS: // in druid, all equals are null-safe equals + case NULL_SAFE_EQUALS: + Comparable value = literalToType(hiveLeaf.getLiteral(), type); + if (value == null) { + return null; + } + if (operator == PredicateLeaf.Operator.LESS_THAN) { + return Arrays.asList(withNot ? Range.atLeast(value) : Range.lessThan(value)); + } else if (operator == PredicateLeaf.Operator.LESS_THAN_EQUALS) { + return Arrays.asList(withNot ? Range.greaterThan(value) : Range.atMost(value)); + } else { + if (!withNot) { + return Arrays.asList(Range.closed(value, value)); + } + return Arrays.asList(Range.lessThan(value), Range.greaterThan(value)); + } + case BETWEEN: + Comparable value1 = literalToType(hiveLeaf.getLiteralList().get(0), type); + Comparable value2 = literalToType(hiveLeaf.getLiteralList().get(1), type); + if (value1 == null || value2 == null) { + return null; + } + boolean inverted = value1.compareTo(value2) > 0; + if (!withNot) { + return Arrays.asList(inverted ? Range.closed(value2, value1) : Range.closed(value1, value2)); + } + return Arrays.asList( + Range.lessThan(inverted ? value2 : value1), + Range.greaterThan(inverted ? value1 : value2) + ); + case IN: + List ranges = Lists.newArrayList(); + for (Object literal : hiveLeaf.getLiteralList()) { + Comparable element = literalToType(literal, type); + if (element == null) { + return null; + } + if (withNot) { + ranges.addAll(Arrays.asList(Range.lessThan(element), Range.greaterThan(element))); + } else { + ranges.add(Range.closed(element, element)); + } + } + return ranges; + } + return null; + } + + private static Comparable literalToType(Object literal, PrimitiveTypeInfo type) + { + switch (type.getPrimitiveCategory()) { + case LONG: + return toLong(literal); + case INT: + return toInt(literal); + case FLOAT: + return toFloat(literal); + case DOUBLE: + return toDouble(literal); + case STRING: + return String.valueOf(literal); + case TIMESTAMP: + return toTimestamp(literal); + } + return null; + } + + private static Comparable toTimestamp(Object literal) + { + if (literal instanceof Timestamp) { + return (Timestamp)literal; + } + if (literal instanceof Date) { + return new Timestamp(((Date) literal).getTime()); + } + if (literal instanceof Number) { + return new Timestamp(((Number) literal).longValue()); + } + if (literal instanceof String) { + String string = (String) literal; + if (StringUtils.isNumeric(string)) { + return new Timestamp(Long.valueOf(string)); + } + try { + return Timestamp.valueOf(string); + } + catch (NumberFormatException e) { + // ignore + } + } + return null; + } + + private static Long toLong(Object literal) + { + if (literal instanceof Number) { + return ((Number) literal).longValue(); + } + if (literal instanceof Date) { + return ((Date) literal).getTime(); + } + if (literal instanceof Timestamp) { + return ((Timestamp) literal).getTime(); + } + if (literal instanceof String) { + try { + return Long.valueOf((String) literal); + } + catch (NumberFormatException e) { + // ignore + } + try { + return DateFormat.getDateInstance().parse((String) literal).getTime(); + } + catch (ParseException e) { + // best effort. ignore + } + } + return null; + } + + private static Integer toInt(Object literal) + { + if (literal instanceof Number) { + return ((Number) literal).intValue(); + } + if (literal instanceof String) { + try { + return Integer.valueOf((String) literal); + } + catch (NumberFormatException e) { + // ignore + } + } + return null; + } + + private static Float toFloat(Object literal) + { + if (literal instanceof Number) { + return ((Number) literal).floatValue(); + } + if (literal instanceof String) { + try { + return Float.valueOf((String) literal); + } + catch (NumberFormatException e) { + // ignore + } + } + return null; + } + + private static Double toDouble(Object literal) + { + if (literal instanceof Number) { + return ((Number) literal).doubleValue(); + } + if (literal instanceof String) { + try { + return Double.valueOf((String) literal); + } + catch (NumberFormatException e) { + // ignore + } + } + return null; + } +} diff --git a/extensions-core/hive-extensions/src/main/java/io/druid/hive/Ranges.java b/extensions-core/hive-extensions/src/main/java/io/druid/hive/Ranges.java new file mode 100644 index 000000000000..777b25da56ad --- /dev/null +++ b/extensions-core/hive-extensions/src/main/java/io/druid/hive/Ranges.java @@ -0,0 +1,122 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.BoundType; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; + +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.TreeSet; + +/** + */ +public class Ranges +{ + public static final Predicate VALID = new Predicate() + { + @Override + public boolean apply(Range input) + { + return !input.isEmpty(); + } + }; + + public static final Function, List> COMPACT = new Function, List>() + { + @Override + public List apply(List input) + { + return Ranges.condenseRanges(input); + } + }; + + public static List condenseRanges(List ranges) + { + if (ranges.size() <= 1) { + return ranges; + } + + Comparator startThenEnd = new Comparator() + { + @Override + public int compare(Range lhs, Range rhs) + { + int compare = 0; + if (lhs.hasLowerBound() && rhs.hasLowerBound()) { + compare = lhs.lowerEndpoint().compareTo(rhs.lowerEndpoint()); + } else if (!lhs.hasLowerBound() && rhs.hasLowerBound()) { + compare = -1; + } else if (lhs.hasLowerBound() && !rhs.hasLowerBound()) { + compare = 1; + } + if (compare != 0) { + return compare; + } + if (lhs.hasUpperBound() && rhs.hasUpperBound()) { + compare = lhs.upperEndpoint().compareTo(rhs.upperEndpoint()); + } else if (!lhs.hasUpperBound() && rhs.hasUpperBound()) { + compare = -1; + } else if (lhs.hasUpperBound() && !rhs.hasUpperBound()) { + compare = 1; + } + return compare; + } + }; + + TreeSet sortedIntervals = Sets.newTreeSet(startThenEnd); + sortedIntervals.addAll(ranges); + + List retVal = Lists.newArrayList(); + + Iterator intervalsIter = sortedIntervals.iterator(); + Range currInterval = intervalsIter.next(); + while (intervalsIter.hasNext()) { + Range next = intervalsIter.next(); + if (currInterval.encloses(next)) { + continue; + } + if (mergeable(currInterval, next)) { + currInterval = currInterval.span(next); + } else { + retVal.add(currInterval); + currInterval = next; + } + } + retVal.add(currInterval); + + return retVal; + } + + public static boolean mergeable(Range range1, Range range2) + { + Comparable x1 = range1.upperEndpoint(); + Comparable x2 = range2.lowerEndpoint(); + int compare = x1.compareTo(x2); + return compare > 0 || (compare == 0 + && range1.upperBoundType() == BoundType.CLOSED + && range2.lowerBoundType() == BoundType.CLOSED); + } +} diff --git a/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java b/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java new file mode 100644 index 000000000000..48f548c2b9dd --- /dev/null +++ b/extensions-core/hive-extensions/src/test/java/io/druid/hive/ExpressionConverterTest.java @@ -0,0 +1,305 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.hive; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Functions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.filter.DimFilter; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class ExpressionConverterTest +{ + private final ObjectMapper mapper = new DefaultObjectMapper(); + + @Test + public void test() throws Exception + { + Map types = Maps.newHashMap(); + types.put("__time", TypeInfoFactory.longTypeInfo); + types.put("col1", TypeInfoFactory.stringTypeInfo); + types.put("col2", TypeInfoFactory.stringTypeInfo); + + ExprNodeColumnDesc longTime = new ExprNodeColumnDesc(Long.class, "__time", "some_table", false); + ExprNodeColumnDesc timestampTime = new ExprNodeColumnDesc(Timestamp.class, "__time", "some_table", false); + + ExprNodeColumnDesc someColumn1 = new ExprNodeColumnDesc(String.class, "col1", "some_table", false); + ExprNodeColumnDesc someColumn2 = new ExprNodeColumnDesc(String.class, "col2", "some_table", false); + + // cannot do this +// ExprNodeGenericFuncDesc timeCastToDouble = new ExprNodeGenericFuncDesc( +// PrimitiveObjectInspectorFactory.javaLongObjectInspector, +// new GenericUDFBridge("double", false, UDFToDouble.class.getName()), Arrays.asList(longTime) +// ); + + ExprNodeGenericFuncDesc noise1 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPNotEqual(), + Arrays.asList( + someColumn1, + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value1") + ) + ); + ExprNodeGenericFuncDesc noise2 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPGreaterThan(), + Arrays.asList( + someColumn2, + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value2") + ) + ); + ExprNodeGenericFuncDesc noise3 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFIn(), + Arrays.asList( + someColumn2, + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value1"), + new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, "value2") + ) + ); + ExprNodeGenericFuncDesc gt = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPGreaterThan(), + Arrays.asList( + timestampTime, + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new Timestamp(new DateTime(2010, 1, 1, 0, 0).getMillis())) + ) + ); + ExprNodeGenericFuncDesc lt = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPLessThan(), + Arrays.asList( + longTime, + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2012, 3, 1, 0, 0).getMillis()) + ) + ); + ExprNodeGenericFuncDesc all = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPOr(), + Arrays.asList(lt, gt) + ); + ExprNodeGenericFuncDesc mixed1 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(noise3, lt) + ); + ExprNodeGenericFuncDesc between0 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(lt, gt) + ); + + ExprNodeGenericFuncDesc between1 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFBetween(), + Arrays.asList( + new ExprNodeConstantDesc(false), + longTime, + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2011, 6, 1, 10, 0).getMillis()), + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2016, 4, 1, 12, 0).getMillis()) + ) + ); + ExprNodeGenericFuncDesc between2 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFBetween(), + Arrays.asList( + new ExprNodeConstantDesc(false), + longTime, + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2016, 4, 1, 12, 0).getMillis()), + new ExprNodeConstantDesc(TypeInfoFactory.longTypeInfo, new DateTime(2017, 1, 1, 12, 10).getMillis()) + ) + ); + + ExprNodeGenericFuncDesc intersectAnd = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(between0, between1) + ); + ExprNodeGenericFuncDesc intersectOr = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPOr(), + Arrays.asList(between0, between1) + ); + + ExprNodeGenericFuncDesc abutAnd = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(between1, between2) + ); + ExprNodeGenericFuncDesc abutOr = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPOr(), + Arrays.asList(between1, between2) + ); + + ExprNodeGenericFuncDesc split = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPOr(), + Arrays.asList(between0, between2) + ); + + ExprNodeGenericFuncDesc complex1 = new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPOr(), + Arrays.asList( + new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(noise1, lt) + ), + new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList( + between1, + new ExprNodeGenericFuncDesc( + PrimitiveObjectInspectorFactory.javaBooleanObjectInspector, + new GenericUDFOPAnd(), + Arrays.asList(noise2, gt) + ) + ) + ) + ) + ); + + validate( + all, + types, + Arrays.asList("(-∞‥+∞)"), + Arrays.asList("-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z") + ); + validate( + mixed1, + types, + Arrays.asList("(-∞‥1330560000000)"), + Arrays.asList("-146136543-09-08T08:23:32.096Z/2012-03-01T00:00:00.000Z") + ); + validate( + between0, + types, + Arrays.asList("(1262304000000‥1330560000000)"), + Arrays.asList("2010-01-01T00:00:00.001Z/2012-03-01T00:00:00.000Z") + ); + validate( + between1, + types, + Arrays.asList("[1306922400000‥1459512000000]"), + Arrays.asList("2011-06-01T10:00:00.000Z/2016-04-01T12:00:00.001Z") + ); + validate( + between2, + types, + Arrays.asList("[1459512000000‥1483272600000]"), + Arrays.asList("2016-04-01T12:00:00.000Z/2017-01-01T12:10:00.001Z") + ); + + // 0 AND 1 + validate( + intersectAnd, + types, + Arrays.asList("[1306922400000‥1330560000000)"), + Arrays.asList("2011-06-01T10:00:00.000Z/2012-03-01T00:00:00.000Z") + ); + // 0 OR 1 + validate( + intersectOr, + types, + Arrays.asList("(1262304000000‥1459512000000]"), + Arrays.asList("2010-01-01T00:00:00.001Z/2016-04-01T12:00:00.001Z") + ); + // 1 AND 2 + validate( + abutAnd, + types, + Arrays.asList("[1459512000000‥1459512000000]"), + Arrays.asList("2016-04-01T12:00:00.000Z/2016-04-01T12:00:00.001Z") + ); + // 1 OR 2 + validate( + abutOr, + types, + Arrays.asList("[1306922400000‥1483272600000]"), + Arrays.asList("2011-06-01T10:00:00.000Z/2017-01-01T12:10:00.001Z") + ); + // 0 OR 2 + validate( + split, + types, + Arrays.asList("(1262304000000‥1330560000000)", "[1459512000000‥1483272600000]"), + Arrays.asList( + "2010-01-01T00:00:00.001Z/2012-03-01T00:00:00.000Z", + "2016-04-01T12:00:00.000Z/2017-01-01T12:10:00.001Z" + ) + ); + + validate( + complex1, + types, + Arrays.asList("(-∞‥1459512000000]"), + Arrays.asList("-146136543-09-08T08:23:32.096Z/2016-04-01T12:00:00.001Z") + ); + } + + private void validate( + ExprNodeGenericFuncDesc predicate, + Map types, + List expected1, + List expected2 + ) throws Exception + { + Map> converted = ExpressionConverter.getRanges(predicate, types); + List ranges = converted.remove(ExpressionConverter.TIME_COLUMN_NAME); + List intervals = ExpressionConverter.toInterval(ranges); + Assert.assertEquals(expected1, Lists.transform(ranges, Functions.toStringFunction())); + Assert.assertEquals(expected2, Lists.transform(intervals, Functions.toStringFunction())); + for (Map.Entry> entry : converted.entrySet()) { + System.out.println( + mapper.readValue( + mapper.writeValueAsString(ExpressionConverter.toFilter(entry.getKey(), entry.getValue())), DimFilter.class + ) + ); + } + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/MapWritable.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/MapWritable.java new file mode 100644 index 000000000000..ab4408156ebb --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/MapWritable.java @@ -0,0 +1,95 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexer.hadoop; + +import com.google.common.collect.Maps; +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + */ +public class MapWritable implements Writable +{ + private final Map value; + + public MapWritable() + { + value = Maps.newHashMap(); + } + + public MapWritable(Map value) + { + this.value = value; + } + + public Map getValue() + { + return value; + } + + public void update(Map newMap) + { + value.clear(); + value.putAll(newMap); + } + + @Override + public void write(DataOutput out) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public void readFields(DataInput in) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() + { + return Objects.hashCode(value); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + return Objects.equals(value, ((MapWritable) o).value); + + } + + @Override + public String toString() + { + return "MapWritable{value=" + value + '}'; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java new file mode 100644 index 000000000000..bd3414009d5b --- /dev/null +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/QueryBasedInputFormat.java @@ -0,0 +1,577 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexer.hadoop; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.base.Functions; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.metamx.common.Pair; +import com.metamx.common.StringUtils; +import com.metamx.common.lifecycle.Lifecycle; +import com.metamx.common.logger.Logger; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.HttpClientConfig; +import com.metamx.http.client.HttpClientInit; +import com.metamx.http.client.Request; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; +import io.druid.collections.CountingMap; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.jackson.DruidDefaultSerializersModule; +import io.druid.query.Druids; +import io.druid.query.LocatedSegmentDescriptor; +import io.druid.query.Result; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.dimension.ExtractionDimensionSpec; +import io.druid.query.extraction.TimeFormatExtractionFn; +import io.druid.query.filter.DimFilter; +import io.druid.query.select.EventHolder; +import io.druid.query.select.PagingSpec; +import io.druid.query.select.SelectQuery; +import io.druid.query.select.SelectResultValue; +import io.druid.segment.column.Column; +import io.druid.server.coordination.DruidServerMetadata; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.joda.time.Interval; + +import javax.ws.rs.core.MediaType; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.URL; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +public class QueryBasedInputFormat extends InputFormat + implements org.apache.hadoop.mapred.InputFormat +{ + protected static final Logger logger = new Logger(QueryBasedInputFormat.class); + + public static final String CONF_DRUID_BROKER_ADDRESS = "druid.broker.address"; + public static final String CONF_DRUID_DATASOURCE = "druid.datasource"; + public static final String CONF_DRUID_INTERVALS = "druid.intervals"; + public static final String CONF_DRUID_FILTERS = "druid.filters"; + + public static final String CONF_MAX_SPLIT_SIZE = "druid.max.split.size"; + public static final String CONF_SELECT_THRESHOLD = "druid.select.threshold"; + + public static final String CONF_SELECT_COLUMNS = "hive.io.file.readcolumn.names"; + + public static final int DEFAULT_SELECT_THRESHOLD = 10000; + public static final int DEFAULT_MAX_SPLIT_SIZE = -1; // split per segment + + protected Configuration configure(Configuration configuration, ObjectMapper mapper) throws IOException + { + return configuration; + } + + @Override + public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits) throws IOException + { + return getInputSplits(job); + } + + @Override + public List getSplits(JobContext context) throws IOException, InterruptedException + { + return Arrays.asList(getInputSplits(context.getConfiguration())); + } + + protected DruidInputSplit[] getInputSplits(Configuration conf) throws IOException + { + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new DruidDefaultSerializersModule()); + + conf = configure(conf, mapper); + + String brokerAddress = Preconditions.checkNotNull(conf.get(CONF_DRUID_BROKER_ADDRESS), "Missing broker address"); + String dataSource = Preconditions.checkNotNull(conf.get(CONF_DRUID_DATASOURCE), "Missing datasource name"); + String intervals = Preconditions.checkNotNull(conf.get(CONF_DRUID_INTERVALS), "Missing interval"); + String filters = conf.get(CONF_DRUID_FILTERS); + + String requestURL = + String.format( + "%s/druid/v2/candidates?datasource=%s&intervals=%s", + brokerAddress, + dataSource, + URLEncoder.encode(intervals, StringUtils.UTF8_STRING) + ); + + + Lifecycle lifecycle = new Lifecycle(); + HttpClient client = HttpClientInit.createClient(HttpClientConfig.builder().build(), lifecycle); + + StatusResponseHolder response; + try { + lifecycle.start(); + response = client.go( + new Request(HttpMethod.GET, new URL(requestURL)), + new StatusResponseHandler(Charsets.UTF_8) + ).get(); + } + catch (Exception e) { + throw new IOException(e instanceof ExecutionException ? e.getCause() : e); + } + finally { + lifecycle.stop(); + } + + final List segments = mapper.readValue( + response.getContent(), + new TypeReference>() + { + } + ); + if (segments == null || segments.size() == 0) { + throw new IllegalStateException("No segments found to read"); + } + + logger.info("segments to read [%s]", segments); + + long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE); + + if (maxSize > 0) { + Collections.shuffle(segments); + for (LocatedSegmentDescriptor segment : segments) { + maxSize = Math.max(maxSize, segment.getSize()); + } + } + + List splits = Lists.newArrayList(); + + List currentGroup = new ArrayList<>(); + long currentSize = 0; + + for (LocatedSegmentDescriptor segment : segments) { + if (maxSize < 0) { + splits.add(toSplit(dataSource, filters, Arrays.asList(segment))); + continue; + } + if (maxSize > 0 && currentSize + segment.getSize() > maxSize) { + splits.add(toSplit(dataSource, filters, currentGroup)); + currentGroup.clear(); + currentSize = 0; + } + + currentGroup.add(segment); + currentSize += segment.getSize(); + } + + if (!currentGroup.isEmpty()) { + splits.add(toSplit(dataSource, filters, currentGroup)); + } + + logger.info("Number of splits [%d]", splits.size()); + return splits.toArray(new DruidInputSplit[splits.size()]); + } + + @Override + public org.apache.hadoop.mapred.RecordReader getRecordReader( + org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter + ) throws IOException + { + DruidRecordReader reader = new DruidRecordReader(); + reader.initialize((DruidInputSplit) split, job); + return reader; + } + + @Override + public RecordReader createRecordReader( + InputSplit split, + TaskAttemptContext context + ) throws IOException, InterruptedException + { + return new DruidRecordReader(); + } + + private DruidInputSplit toSplit(String dataSource, String filters, List segments) + { + long size = 0; + List intervals = Lists.newArrayList(); + for (LocatedSegmentDescriptor segment : segments) { + size += segment.getSize(); + intervals.add(segment.getInterval()); + } + String[] locations = getFrequentLocations(segments); + return new DruidInputSplit(dataSource, intervals, filters, locations, size); + } + + private String[] getFrequentLocations(List segments) + { + List locations = Lists.newArrayList(); + for (LocatedSegmentDescriptor segment : segments) { + for (DruidServerMetadata location : segment.getLocations()) { + locations.add(location.getHost()); + } + } + return getMostFrequentLocations(locations); + } + + private static String[] getMostFrequentLocations(Iterable hosts) + { + final CountingMap counter = new CountingMap<>(); + for (String location : hosts) { + counter.add(location, 1); + } + + final TreeSet> sorted = Sets.>newTreeSet( + new Comparator>() + { + @Override + public int compare(Pair o1, Pair o2) + { + int compare = o2.lhs.compareTo(o1.lhs); // descending + if (compare == 0) { + compare = o1.rhs.compareTo(o2.rhs); // ascending + } + return compare; + } + } + ); + + for (Map.Entry entry : counter.entrySet()) { + sorted.add(Pair.of(entry.getValue().get(), entry.getKey())); + } + + // use default replication factor, if possible + final List locations = Lists.newArrayListWithCapacity(3); + for (Pair frequent : Iterables.limit(sorted, 3)) { + locations.add(frequent.rhs); + } + return locations.toArray(new String[locations.size()]); + } + + public static final class DruidInputSplit extends InputSplit implements org.apache.hadoop.mapred.InputSplit + { + private String dataSource; + private String filters; + private List intervals; + private String[] locations; + private long length; + + //required for deserialization + public DruidInputSplit() + { + } + + public DruidInputSplit( + String dataSource, + List intervals, + String filters, + String[] locations, + long length + ) + { + this.dataSource = dataSource; + this.intervals = intervals; + this.filters = filters; + this.locations = locations; + this.length = length; + } + + @Override + public long getLength() + { + return length; + } + + @Override + public String[] getLocations() + { + return locations; + } + + public String getDataSource() + { + return dataSource; + } + + public String getFilters() + { + return filters; + } + + public List getIntervals() + { + return intervals; + } + + @Override + public void write(DataOutput out) throws IOException + { + out.writeUTF(dataSource); + out.writeInt(intervals.size()); + for (String interval : Lists.transform(intervals, Functions.toStringFunction())) { + out.writeUTF(interval); + } + out.writeUTF(Strings.nullToEmpty(filters)); + out.writeInt(locations.length); + for (String location : locations) { + out.writeUTF(location); + } + out.writeLong(length); + } + + @Override + public void readFields(DataInput in) throws IOException + { + dataSource = in.readUTF(); + intervals = Lists.newArrayList(); + for (int i = in.readInt(); i > 0; i--) { + intervals.add(new Interval(in.readUTF())); + } + filters = in.readUTF(); + locations = new String[in.readInt()]; + for (int i = 0; i < locations.length; i++) { + locations[i] = in.readUTF(); + } + length = in.readLong(); + } + + @Override + public String toString() + { + return "DruidInputSplit{" + + "dataSource=" + dataSource + + ", intervals=" + intervals + + ", filters=" + filters + + ", locations=" + Arrays.toString(locations) + + '}'; + } + } + + public static class DruidRecordReader extends RecordReader + implements org.apache.hadoop.mapred.RecordReader + { + private final Lifecycle lifecycle = new Lifecycle(); + + private int threshold; + private ObjectMapper mapper; + private HttpClient client; + private Druids.SelectQueryBuilder builder; + private Request request; + + private boolean finished; + private Iterator events = Iterators.emptyIterator(); + private Map paging = null; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException + { + initialize((DruidInputSplit) split, context.getConfiguration()); + } + + public void initialize(DruidInputSplit split, Configuration configuration) throws IOException + { + logger.info("Start loading " + split); + + String location = split.getLocations()[0]; + String dataSource = split.getDataSource(); + List intervals = split.getIntervals(); + + client = HttpClientInit.createClient(HttpClientConfig.builder().build(), lifecycle); + + mapper = new DefaultObjectMapper(); + threshold = configuration.getInt(CONF_SELECT_THRESHOLD, DEFAULT_SELECT_THRESHOLD); + + builder = new Druids.SelectQueryBuilder() + .dataSource(dataSource) + .intervals(intervals) + .granularity(QueryGranularity.ALL); + + List dimensionSpecs = Lists.newArrayList(); + for (String column : configuration.get(CONF_SELECT_COLUMNS).split(",")) { + column = column.trim(); + if (column.equals(Column.TIME_COLUMN_NAME)) { + dimensionSpecs.add( + new ExtractionDimensionSpec( + column, + column, + new TimeFormatExtractionFn("yyyy-MM-dd'T'HH:mm:ss'Z'", null, null) + ) + ); + } else { + dimensionSpecs.add(new DefaultDimensionSpec(column, column)); + } + } + + builder.dimensionSpecs(dimensionSpecs); + + String filters = split.getFilters(); + if (filters != null && !filters.isEmpty()) { + builder.filters(mapper.readValue(filters, DimFilter.class)); + } + + request = new Request( + HttpMethod.POST, + new URL(String.format("%s/druid/v2", "http://" + location)) + ); + try { + lifecycle.start(); + } + catch (Exception e) { + throw new IOException(e); + } + + if (logger.isInfoEnabled()) { + logger.info("Retrieving from druid using query.. " + nextQuery()); + } + } + + private void nextPage() throws IOException, InterruptedException + { + StatusResponseHolder response; + try { + response = client.go( + request.setContent(mapper.writeValueAsBytes(nextQuery())) + .setHeader( + HttpHeaders.Names.CONTENT_TYPE, + MediaType.APPLICATION_JSON + ), + new StatusResponseHandler(Charsets.UTF_8) + ).get(); + } + catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + + HttpResponseStatus status = response.getStatus(); + if (!status.equals(HttpResponseStatus.OK)) { + throw new RuntimeException(response.getContent()); + } + + List> value = mapper.readValue( + response.getContent(), + new TypeReference>>() + { + } + ); + if (!value.isEmpty()) { + SelectResultValue result = value.get(0).getValue(); + events = result.iterator(); + paging = result.getPagingIdentifiers(); + } else { + events = Iterators.emptyIterator(); + finished = true; + } + } + + private SelectQuery nextQuery() + { + PagingSpec pagingSpec = new PagingSpec(paging, threshold, true); + return builder.pagingSpec(pagingSpec).build(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException + { + if (!finished && !events.hasNext()) { + nextPage(); + } + return events.hasNext(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException + { + return NullWritable.get(); + } + + @Override + public MapWritable getCurrentValue() throws IOException, InterruptedException + { + return new MapWritable(events.next().getEvent()); + } + + @Override + public float getProgress() throws IOException + { + return finished ? 1 : 0; + } + + @Override + public NullWritable createKey() + { + return NullWritable.get(); + } + + @Override + public MapWritable createValue() + { + return new MapWritable(); + } + + @Override + public boolean next(NullWritable key, MapWritable value) throws IOException + { + try { + if (nextKeyValue()) { + value.update(events.next().getEvent()); + return true; + } + } + catch (InterruptedException e) { + // ignore + } + return false; + } + + @Override + public long getPos() throws IOException + { + return 0; + } + + @Override + public void close() throws IOException + { + lifecycle.stop(); + } + } +} diff --git a/pom.xml b/pom.xml index e011e0762ce6..23a557c6b641 100644 --- a/pom.xml +++ b/pom.xml @@ -93,6 +93,7 @@ extensions-core/postgresql-metadata-storage extensions-core/namespace-lookup extensions-core/s3-extensions + extensions-core/hive-extensions extensions-contrib/azure-extensions extensions-contrib/cassandra-storage diff --git a/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java b/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java index 8b0aaae6ced2..978bcb42f15a 100644 --- a/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java +++ b/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java @@ -139,9 +139,9 @@ public static void setup() throws IOException segment_override = new IncrementalIndexSegment(index2, makeIdentifier(index2, "v2")); VersionedIntervalTimeline timeline = new VersionedIntervalTimeline(StringComparators.LEXICOGRAPHIC); - timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk(segment0)); - timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk(segment1)); - timeline.add(index2.getInterval(), "v2", new SingleElementPartitionChunk(segment_override)); + timeline.add(index0.getInterval(), "v1", new SingleElementPartitionChunk(segment0), -1); + timeline.add(index1.getInterval(), "v1", new SingleElementPartitionChunk(segment1), -1); + timeline.add(index2.getInterval(), "v2", new SingleElementPartitionChunk(segment_override), -1); segmentIdentifiers = Lists.newArrayList(); for (TimelineObjectHolder holder : timeline.lookup(new Interval("2011-01-12/2011-01-14"))) { diff --git a/processing/src/test/java/io/druid/segment/SchemalessIndex.java b/processing/src/test/java/io/druid/segment/SchemalessIndex.java index ab8e2019ab3a..34863d9754ae 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessIndex.java +++ b/processing/src/test/java/io/druid/segment/SchemalessIndex.java @@ -448,7 +448,7 @@ private static QueryableIndex makeAppendedMMappedIndex( ShardSpec noneShardSpec = new NoneShardSpec(); for (int i = 0; i < intervals.size(); i++) { - timeline.add(intervals.get(i), i, noneShardSpec.createChunk(filesToMap.get(i))); + timeline.add(intervals.get(i), i, noneShardSpec.createChunk(filesToMap.get(i)), -1); } final List adapters = Lists.newArrayList( diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 95826bb60ca5..2b2a43cafd94 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -229,7 +229,12 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm timelines.put(segment.getDataSource(), timeline); } - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector)); + timeline.add( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(selector), + segment.getSize() + ); selectors.put(segmentId, selector); } diff --git a/server/src/main/java/io/druid/client/CoordinatorServerView.java b/server/src/main/java/io/druid/client/CoordinatorServerView.java index e61d7a309dc3..1b076cec17ee 100644 --- a/server/src/main/java/io/druid/client/CoordinatorServerView.java +++ b/server/src/main/java/io/druid/client/CoordinatorServerView.java @@ -141,7 +141,8 @@ private void serverAddedSegment(final DruidServerMetadata server, final DataSegm timeline.add( segment.getInterval(), segment.getVersion(), - segment.getShardSpec().createChunk(segmentLoadInfo) + segment.getShardSpec().createChunk(segmentLoadInfo), + segment.getSize() ); segmentLoadInfos.put(segmentId, segmentLoadInfo); } diff --git a/server/src/main/java/io/druid/client/selector/ServerSelector.java b/server/src/main/java/io/druid/client/selector/ServerSelector.java index 3a4f58cfb8ba..5354aefd9a41 100644 --- a/server/src/main/java/io/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/io/druid/client/selector/ServerSelector.java @@ -19,10 +19,14 @@ package io.druid.client.selector; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.emitter.EmittingLogger; +import io.druid.client.DruidServer; +import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; +import java.util.List; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; @@ -78,6 +82,16 @@ public boolean isEmpty() } } + public List getCandidates() { + List result = Lists.newArrayList(); + synchronized (this) { + for (QueryableDruidServer server : servers) { + result.add(server.getServer().getMetadata()); + } + } + return result; + } + public QueryableDruidServer pick() { synchronized (this) { diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index db64cb6fa6df..74ba9bf74721 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -250,7 +250,12 @@ private VersionedIntervalTimeline getTimelineForIntervalsWi DataSegment.class ); - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); + timeline.add( + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(segment), + segment.getSize() + ); } diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java index 0dbeadbd1759..153680e77a88 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataSegmentManager.java @@ -54,7 +54,6 @@ import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; -import org.skife.jdbi.v2.exceptions.TransactionFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.util.ByteArrayMapper; @@ -229,7 +228,8 @@ public VersionedIntervalTimeline fold( timeline.add( segment.getInterval(), segment.getVersion(), - segment.getShardSpec().createChunk(segment) + segment.getShardSpec().createChunk(segment), + segment.getSize() ); return timeline; diff --git a/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java b/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java new file mode 100644 index 000000000000..c5aac03d79dd --- /dev/null +++ b/server/src/main/java/io/druid/query/LocatedSegmentDescriptor.java @@ -0,0 +1,97 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import io.druid.server.coordination.DruidServerMetadata; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Set; + +/** + */ +public class LocatedSegmentDescriptor extends SegmentDescriptor +{ + private final long size; + private final List locations; + + @JsonCreator + public LocatedSegmentDescriptor( + @JsonProperty("itvl") Interval interval, + @JsonProperty("ver") String version, + @JsonProperty("part") int partitionNumber, + @JsonProperty("size") long size, + @JsonProperty("locations") List locations + ) + { + super(interval, version, partitionNumber); + this.size = size; + this.locations = locations == null ? ImmutableList.of() : locations; + } + + public LocatedSegmentDescriptor(SegmentDescriptor descriptor, long size, List candidates) + { + this(descriptor.getInterval(), descriptor.getVersion(), descriptor.getPartitionNumber(), size, candidates); + } + + @JsonProperty("size") + public long getSize() + { + return size; + } + + @JsonProperty("locations") + public List getLocations() + { + return locations; + } + + @Override + public boolean equals(Object o) + { + if (!(o instanceof LocatedSegmentDescriptor) || !super.equals(o)) { + return false; + } + + LocatedSegmentDescriptor other = (LocatedSegmentDescriptor) o; + return getHostNames().equals(other.getHostNames()); + } + + private Set getHostNames() + { + Set hostNames = Sets.newHashSet(); + for (DruidServerMetadata meta : locations) { + hostNames.add(meta.getHost()); + } + return hostNames; + } + + @Override + public int hashCode() + { + int result = super.hashCode(); + result = 31 * result + getHostNames().hashCode(); + return result; + } +} diff --git a/server/src/main/java/io/druid/server/BrokerQueryResource.java b/server/src/main/java/io/druid/server/BrokerQueryResource.java new file mode 100644 index 000000000000..85f61dc5c3cd --- /dev/null +++ b/server/src/main/java/io/druid/server/BrokerQueryResource.java @@ -0,0 +1,166 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.metamx.common.guava.Sequences; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.TimelineServerView; +import io.druid.client.selector.ServerSelector; +import io.druid.common.utils.JodaUtils; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import io.druid.query.DataSource; +import io.druid.query.LocatedSegmentDescriptor; +import io.druid.query.Query; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ServerConfig; +import io.druid.server.log.RequestLogger; +import io.druid.timeline.TimelineLookup; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.partition.PartitionChunk; +import org.joda.time.Interval; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.List; + +/** + */ +@Path("/druid/v2/") +public class BrokerQueryResource extends QueryResource +{ + private final TimelineServerView brokerServerView; + + @Inject + public BrokerQueryResource( + ServerConfig config, + @Json ObjectMapper jsonMapper, + @Smile ObjectMapper smileMapper, + QuerySegmentWalker texasRanger, + ServiceEmitter emitter, + RequestLogger requestLogger, + QueryManager queryManager, + TimelineServerView brokerServerView + ) + { + super(config, jsonMapper, smileMapper, texasRanger, emitter, requestLogger, queryManager); + this.brokerServerView = brokerServerView; + } + + @POST + @Path("/candidates") + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE}) + public Response getQueryTargets( + InputStream in, + @QueryParam("pretty") String pretty, + @Context final HttpServletRequest req // used only to get request content-type and remote address + ) throws IOException + { + final String reqContentType = req.getContentType(); + final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType) + || APPLICATION_SMILE.equals(reqContentType); + final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON; + + ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; + final ObjectWriter jsonWriter = pretty != null + ? objectMapper.writerWithDefaultPrettyPrinter() + : objectMapper.writer(); + + try { + Query query = objectMapper.readValue(in, Query.class); + List located = getTargetLocations(query.getDataSource(), query.getIntervals()); + if (located == null || located.isEmpty()) { + return Response.ok(Sequences.empty()).build(); + } + return Response.ok(jsonWriter.writeValueAsString(located), contentType).build(); + } + catch (Exception e) { + return Response.serverError().type(contentType).entity( + jsonWriter.writeValueAsString( + ImmutableMap.of( + "error", e.getMessage() == null ? "null exception" : e.getMessage() + ) + ) + ).build(); + } + } + + @GET + @Path("/candidates") + @Produces(MediaType.APPLICATION_JSON) + public Response getQueryTargets( + @QueryParam("datasource") String datasource, + @QueryParam("intervals") String intervals + ) throws IOException + { + List intervalList = Lists.newArrayList(); + for (String interval : intervals.split(",")) { + intervalList.add(Interval.parse(interval.trim())); + } + List located = getTargetLocations( + new TableDataSource(datasource), + JodaUtils.condenseIntervals(intervalList) + ); + return Response.ok(located).build(); + } + + private List getTargetLocations(DataSource datasource, List intervals) + { + TimelineLookup timeline = brokerServerView.getTimeline(datasource); + if (timeline == null) { + return Collections.emptyList(); + } + List located = Lists.newArrayList(); + for (Interval interval : intervals) { + for (TimelineObjectHolder holder : timeline.lookup(interval)) { + for (PartitionChunk chunk : holder.getObject()) { + ServerSelector selector = chunk.getObject(); + final SegmentDescriptor descriptor = new SegmentDescriptor( + holder.getInterval(), holder.getVersion(), chunk.getChunkNumber() + ); + List candidates = selector.getCandidates(); + located.add(new LocatedSegmentDescriptor(descriptor, holder.getApproximatedSize(), candidates)); + } + } + } + return located; + } +} diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 0b9ac2b0fa50..04c2b9061447 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -68,19 +68,19 @@ @Path("/druid/v2/") public class QueryResource { - private static final EmittingLogger log = new EmittingLogger(QueryResource.class); + protected static final EmittingLogger log = new EmittingLogger(QueryResource.class); @Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE - private static final String APPLICATION_SMILE = "application/smile"; + protected static final String APPLICATION_SMILE = "application/smile"; - private static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7*1024; + protected static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7*1024; - private final ServerConfig config; - private final ObjectMapper jsonMapper; - private final ObjectMapper smileMapper; - private final QuerySegmentWalker texasRanger; - private final ServiceEmitter emitter; - private final RequestLogger requestLogger; - private final QueryManager queryManager; + protected final ServerConfig config; + protected final ObjectMapper jsonMapper; + protected final ObjectMapper smileMapper; + protected final QuerySegmentWalker texasRanger; + protected final ServiceEmitter emitter; + protected final RequestLogger requestLogger; + protected final QueryManager queryManager; @Inject public QueryResource( diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 4b7c218aa167..07f9d6c86f67 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -186,7 +186,8 @@ public boolean loadSegment(final DataSegment segment) throws SegmentLoadingExcep loadedIntervals.add( segment.getInterval(), segment.getVersion(), - segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter)) + segment.getShardSpec().createChunk(new ReferenceCountingSegment(adapter)), + segment.getSize() ); synchronized (dataSourceSizes) { dataSourceSizes.add(dataSource, segment.getSize()); diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java index 20bb23d6e417..c3b71c3cdfe8 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowed.java @@ -68,7 +68,10 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) for (DataSegment segment : dataSource.getSegments()) { timeline.add( - segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment) + segment.getInterval(), + segment.getVersion(), + segment.getShardSpec().createChunk(segment), + segment.getSize() ); } } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java index d9fdbe894154..1c5ca7f66173 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentMerger.java @@ -89,7 +89,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) timeline.add( dataSegment.getInterval(), dataSegment.getVersion(), - dataSegment.getShardSpec().createChunk(dataSegment) + dataSegment.getShardSpec().createChunk(dataSegment), + dataSegment.getSize() ); } } diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java index 99d30caec6be..c468ef494f32 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientFunctionalityTest.java @@ -164,7 +164,7 @@ public QueryableDruidServer pick(TreeMap> pri } } ) - )); + ), -1); } protected CachingClusteredClient makeClient(final ListeningExecutorService backgroundExecutorService) diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 4f09b05cfcfc..2bb0ad520cee 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -559,7 +559,7 @@ public void testCachingOverBulkLimitEnforcesLimit() throws Exception new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) ); selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), dataSegment); - timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector)); + timeline.add(interval, "v", new SingleElementPartitionChunk<>(selector), -1); client.run(query, context); @@ -1734,7 +1734,7 @@ private List> populateTimeline( } chunk = new StringPartitionChunk<>(start, end, j, selector); } - timeline.add(queryIntervals.get(k), String.valueOf(k), chunk); + timeline.add(queryIntervals.get(k), String.valueOf(k), chunk, -1); } } return serverExpectationList; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java index 2e108fc45a23..e21c66115623 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java @@ -278,7 +278,8 @@ public Set findUsedSegments(Set identifiers) thr timeline.add( dataSegment.getInterval(), dataSegment.getVersion(), - dataSegment.getShardSpec().createChunk(dataSegment) + dataSegment.getShardSpec().createChunk(dataSegment), + dataSegment.getSize() ); } diff --git a/server/src/test/java/io/druid/server/shard/NumberedShardSpecTest.java b/server/src/test/java/io/druid/server/shard/NumberedShardSpecTest.java index 33187c17a984..d9a2d3033180 100644 --- a/server/src/test/java/io/druid/server/shard/NumberedShardSpecTest.java +++ b/server/src/test/java/io/druid/server/shard/NumberedShardSpecTest.java @@ -186,7 +186,7 @@ private void testVersionedIntervalTimelineBehaviorForNumberedShardSpec( Interval interval = new Interval("2000/3000"); String version = "v1"; for (PartitionChunk chunk : chunks) { - timeline.add(interval, version, chunk); + timeline.add(interval, version, chunk, -1); } Set actualObjects = new HashSet<>(); diff --git a/services/src/main/java/io/druid/cli/CliBroker.java b/services/src/main/java/io/druid/cli/CliBroker.java index 7994bd541e5a..95c25b6db0eb 100644 --- a/services/src/main/java/io/druid/cli/CliBroker.java +++ b/services/src/main/java/io/druid/cli/CliBroker.java @@ -44,9 +44,9 @@ import io.druid.query.QueryToolChestWarehouse; import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.lookup.LookupModule; +import io.druid.server.BrokerQueryResource; import io.druid.server.ClientInfoResource; import io.druid.server.ClientQuerySegmentWalker; -import io.druid.server.QueryResource; import io.druid.server.coordination.broker.DruidBroker; import io.druid.server.http.BrokerResource; import io.druid.server.initialization.jetty.JettyServerInitializer; @@ -103,10 +103,10 @@ public void configure(Binder binder) binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class); binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); - Jerseys.addResource(binder, QueryResource.class); + Jerseys.addResource(binder, BrokerQueryResource.class); Jerseys.addResource(binder, BrokerResource.class); Jerseys.addResource(binder, ClientInfoResource.class); - LifecycleModule.register(binder, QueryResource.class); + LifecycleModule.register(binder, BrokerQueryResource.class); LifecycleModule.register(binder, DruidBroker.class); MetricsModule.register(binder, CacheMonitor.class);