diff --git a/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java b/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java index 9380d102b792..4e9c6c5d4ace 100644 --- a/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/TimestampSpec.java @@ -88,15 +88,19 @@ public DateTime getMissingValue() public DateTime extractTimestamp(Map input) { - final Object o = input.get(timestampColumn); + return parseDateTime(input.get(timestampColumn)); + } + + public DateTime parseDateTime(Object input) + { DateTime extracted = missingValue; - if (o != null) { - if (o.equals(parseCtx.lastTimeObject)) { + if (input != null) { + if (input.equals(parseCtx.lastTimeObject)) { extracted = parseCtx.lastDateTime; } else { ParseCtx newCtx = new ParseCtx(); - newCtx.lastTimeObject = o; - extracted = timestampConverter.apply(o); + newCtx.lastTimeObject = input; + extracted = timestampConverter.apply(input); newCtx.lastDateTime = extracted; parseCtx = newCtx; } diff --git a/docs/content/development/extensions-contrib/time-min-max.md b/docs/content/development/extensions-contrib/time-min-max.md new file mode 100644 index 000000000000..ab853fed9b07 --- /dev/null +++ b/docs/content/development/extensions-contrib/time-min-max.md @@ -0,0 +1,85 @@ +--- +layout: doc_page +--- + +# Timestamp Min/Max aggregators + +To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-time-min-max`. + +These aggregators enable more precise calculation of min and max time of given events than `__time` column whose granularity is sparse, the same as query granularity. +To use this feature, a "timeMin" or "timeMax" aggregator must be included at indexing time. +They can apply to any columns that can be converted to timestamp, which include Long, DateTime, Timestamp, and String types. + +For example, when a data set consists of timestamp, dimension, and metric value like followings. + +``` +2015-07-28T01:00:00.000Z A 1 +2015-07-28T02:00:00.000Z A 1 +2015-07-28T03:00:00.000Z A 1 +2015-07-28T04:00:00.000Z B 1 +2015-07-28T05:00:00.000Z A 1 +2015-07-28T06:00:00.000Z B 1 +2015-07-29T01:00:00.000Z C 1 +2015-07-29T02:00:00.000Z C 1 +2015-07-29T03:00:00.000Z A 1 +2015-07-29T04:00:00.000Z A 1 +``` + +At ingestion time, timeMin and timeMax aggregator can be included as other aggregators. + +```json +{ + "type": "timeMin", + "name": "tmin", + "fieldName": "" +} +``` + +```json +{ + "type": "timeMax", + "name": "tmax", + "fieldName": "" +} +``` + +`name` is output name of aggregator and can be any string. `fieldName` is typically column specified in timestamp spec but can be any column that can be converted to timestamp. + +To query for results, the same aggregators "timeMin" and "timeMax" is used. + +```json +{ + "queryType": "groupBy", + "dataSource": "timeMinMax", + "granularity": "DAY", + "dimensions": ["product"], + "aggregations": [ + { + "type": "count", + "name": "count" + } + { + "type": "timeMin", + "name": "", + "fieldName": "tmin" + }, + { + "type": "timeMax", + "name": "", + "fieldName": "tmax" + } + ], + "intervals": [ + "2010-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z" + ] +} +``` + +Then, result has min and max of timestamp, which is finer than query granularity. + +``` +2015-07-28T00:00:00.000Z A 4 2015-07-28T01:00:00.000Z 2015-07-28T05:00:00.000Z +2015-07-28T00:00:00.000Z B 2 2015-07-28T04:00:00.000Z 2015-07-28T06:00:00.000Z +2015-07-29T00:00:00.000Z A 2 2015-07-29T03:00:00.000Z 2015-07-29T04:00:00.000Z +2015-07-29T00:00:00.000Z C 2 2015-07-29T01:00:00.000Z 2015-07-29T02:00:00.000Z +``` \ No newline at end of file diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 23eba76e22f6..ecef97f5824e 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -58,6 +58,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c |druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-contrib/parquet.html)| |druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)| |druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)| +|druid-time-min-max|Min/Max aggregator for timestamp.|[link](../development/extensions-contrib/time-min-max.html)| |sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)| |graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)| |statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)| diff --git a/extensions-contrib/time-min-max/pom.xml b/extensions-contrib/time-min-max/pom.xml new file mode 100644 index 000000000000..03b72a5ccc75 --- /dev/null +++ b/extensions-contrib/time-min-max/pom.xml @@ -0,0 +1,79 @@ + + + + + druid + io.druid + 0.9.3-SNAPSHOT + ../../pom.xml + + 4.0.0 + + io.druid.extensions.contrib + druid-time-min-max + druid-time-min-max + Min/Max of timestamp + + + + io.druid + druid-api + ${project.parent.version} + provided + + + io.druid + druid-processing + ${project.parent.version} + provided + + + io.druid + druid-common + ${project.parent.version} + provided + + + junit + junit + test + + + org.easymock + easymock + test + + + io.druid + druid-processing + ${project.parent.version} + test-jar + test + + + io.druid + druid-server + ${project.parent.version} + test + + + diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregator.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregator.java new file mode 100644 index 000000000000..4d9d59bae838 --- /dev/null +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregator.java @@ -0,0 +1,111 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.data.input.impl.TimestampSpec; +import io.druid.segment.ObjectColumnSelector; + +import java.util.Comparator; + +public class TimestampAggregator implements Aggregator +{ + static final Comparator COMPARATOR = LongMaxAggregator.COMPARATOR; + + static long combineValues(Object lhs, Object rhs) + { + return Math.max(((Number)lhs).longValue(), ((Number)rhs).longValue()); + } + + private final ObjectColumnSelector selector; + private final String name; + private final TimestampSpec timestampSpec; + private final Comparator comparator; + private final Long initValue; + + private long most; + + public TimestampAggregator( + String name, + ObjectColumnSelector selector, + TimestampSpec timestampSpec, + Comparator comparator, + Long initValue + ) + { + this.name = name; + this.selector = selector; + this.timestampSpec = timestampSpec; + this.comparator = comparator; + this.initValue = initValue; + + reset(); + } + + @Override + public void aggregate() { + Long value = TimestampAggregatorFactory.convertLong(timestampSpec, selector.get()); + + if (value != null) { + most = comparator.compare(most, value) > 0 ? most : value; + } + } + + @Override + public void reset() + { + most = initValue; + } + + @Override + public Object get() + { + return most; + } + + @Override + public float getFloat() + { + return (float) most; + } + + @Override + public String getName() + { + return name; + } + + @Override + public void close() + { + // no resource to cleanup + } + + @Override + public long getLong() + { + return most; + } + + @Override + public Aggregator clone() + { + return new TimestampAggregator(name, selector, timestampSpec, comparator, initValue); + } +} diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java new file mode 100644 index 000000000000..ae62b7678301 --- /dev/null +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampAggregatorFactory.java @@ -0,0 +1,227 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.primitives.Longs; +import io.druid.common.utils.StringUtils; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.segment.ColumnSelectorFactory; +import org.joda.time.DateTime; + +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +public class TimestampAggregatorFactory extends AggregatorFactory +{ + private static final byte CACHE_TYPE_ID = 31; + + final String name; + final String fieldName; + final String timeFormat; + private final Comparator comparator; + private final Long initValue; + + private TimestampSpec timestampSpec; + + TimestampAggregatorFactory( + String name, + String fieldName, + String timeFormat, + Comparator comparator, + Long initValue + ) + { + this.name = name; + this.fieldName = fieldName; + this.timeFormat = timeFormat; + this.comparator = comparator; + this.initValue = initValue; + + this.timestampSpec = new TimestampSpec(fieldName, timeFormat, null); + } + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + return new TimestampAggregator(name, metricFactory.makeObjectColumnSelector(fieldName), timestampSpec, comparator, initValue); + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + return new TimestampBufferAggregator(metricFactory.makeObjectColumnSelector(fieldName), timestampSpec, comparator, initValue); + } + + @Override + public Comparator getComparator() + { + return TimestampAggregator.COMPARATOR; + } + + @Override + public Object combine(Object lhs, Object rhs) + { + return TimestampAggregator.combineValues(lhs, rhs); + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new TimestampAggregatorFactory(name, name, timeFormat, comparator, initValue); + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @Override + public List getRequiredColumns() + { + return Arrays.asList(new TimestampAggregatorFactory(fieldName, fieldName, timeFormat, comparator, initValue)); + } + + @Override + public Object deserialize(Object object) + { + return object; + } + + @Override + public Object finalizeComputation(Object object) + { + return new DateTime((long)object); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @JsonProperty + public String getTimeFormat() + { + return timeFormat; + } + + @Override + public List requiredFields() + { + return Arrays.asList(fieldName); + } + + @Override + public byte[] getCacheKey() + { + byte[] fieldNameBytes = StringUtils.toUtf8(fieldName); + + return ByteBuffer.allocate(1 + fieldNameBytes.length) + .put(CACHE_TYPE_ID).put(fieldNameBytes).array(); + } + + @Override + public String getTypeName() + { + return "long"; + } + + @Override + public int getMaxIntermediateSize() + { + return Longs.BYTES; + } + + @Override + public Object getAggregatorStartValue() + { + return initValue; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TimestampAggregatorFactory that = (TimestampAggregatorFactory) o; + + if (!Objects.equals(fieldName, that.fieldName)) { + return false; + } + if (!Objects.equals(name, that.name)) { + return false; + } + if (!Objects.equals(comparator, that.comparator)) { + return false; + } + if (!Objects.equals(initValue, that.initValue)) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + int result = fieldName != null ? fieldName.hashCode() : 0; + result = 31 * result + (name != null ? name.hashCode() : 0); + result = 31 * result + (comparator != null ? comparator.hashCode() : 0); + result = 31 * result + (initValue != null ? initValue.hashCode() : 0); + return result; + } + + static Long convertLong(TimestampSpec timestampSpec, Object input) + { + if (input instanceof Number) { + return ((Number)input).longValue(); + } else if (input instanceof DateTime) { + return ((DateTime)input).getMillis(); + } else if (input instanceof Timestamp) { + return ((Timestamp)input).getTime(); + } else if (input instanceof String) { + return timestampSpec.parseDateTime(input).getMillis(); + } + + return null; + } +} diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampBufferAggregator.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampBufferAggregator.java new file mode 100644 index 000000000000..e195005e1f29 --- /dev/null +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampBufferAggregator.java @@ -0,0 +1,85 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import io.druid.data.input.impl.TimestampSpec; +import io.druid.segment.ObjectColumnSelector; + +import java.nio.ByteBuffer; +import java.util.Comparator; + +public class TimestampBufferAggregator implements BufferAggregator +{ + private final ObjectColumnSelector selector; + private final TimestampSpec timestampSpec; + private final Comparator comparator; + private final Long initValue; + + public TimestampBufferAggregator( + ObjectColumnSelector selector, + TimestampSpec timestampSpec, + Comparator comparator, + Long initValue) + { + this.selector = selector; + this.timestampSpec = timestampSpec; + this.comparator = comparator; + this.initValue = initValue; + } + + @Override + public void init(ByteBuffer buf, int position) + { + buf.putLong(position, initValue); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + Long newTime = TimestampAggregatorFactory.convertLong(timestampSpec, selector.get()); + if (newTime != null) { + Long prev = buf.getLong(position); + buf.putLong(position, comparator.compare(prev, newTime) > 0 ? prev: newTime); + } + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return buf.getLong(position); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + return (float)buf.getLong(position); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + return buf.getLong(position); + } + + @Override + public void close() { + + } +} diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampMaxAggregatorFactory.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampMaxAggregatorFactory.java new file mode 100644 index 000000000000..3233e645bdf8 --- /dev/null +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampMaxAggregatorFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.Comparator; + +public class TimestampMaxAggregatorFactory extends TimestampAggregatorFactory +{ + @JsonCreator + public TimestampMaxAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("timeFormat") String timeFormat + ) + { + super(name, fieldName, timeFormat, new Comparator() { + @Override + public int compare(Long o1, Long o2) { + return Long.compare(o1, o2); + } + }, Long.MIN_VALUE); + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + } + + @Override + public String toString() + { + return "TimestampMaxAggregatorFactory{" + + "fieldName='" + fieldName + '\'' + + ", name='" + name + '\'' + + '}'; + } +} diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampMinAggregatorFactory.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampMinAggregatorFactory.java new file mode 100644 index 000000000000..00c40fba5140 --- /dev/null +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampMinAggregatorFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.Comparator; + +public class TimestampMinAggregatorFactory extends TimestampAggregatorFactory +{ + @JsonCreator + public TimestampMinAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("timeFormat") String timeFormat + ) + { + super(name, fieldName, timeFormat, new Comparator() { + @Override + public int compare(Long o1, Long o2) { + return -(Long.compare(o1, o2)); + } + }, Long.MAX_VALUE); + Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + } + + @Override + public String toString() + { + return "TimestampMinAggregatorFactory{" + + "fieldName='" + fieldName + '\'' + + ", name='" + name + '\'' + + '}'; + } +} diff --git a/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampMinMaxModule.java b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampMinMaxModule.java new file mode 100644 index 000000000000..3d0204b85585 --- /dev/null +++ b/extensions-contrib/time-min-max/src/main/java/io/druid/query/aggregation/TimestampMinMaxModule.java @@ -0,0 +1,49 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.query.aggregation; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.Arrays; +import java.util.List; + +public class TimestampMinMaxModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("TimestampMinMaxModule") + .registerSubtypes( + new NamedType(TimestampMaxAggregatorFactory.class, "timeMax"), + new NamedType(TimestampMinAggregatorFactory.class, "timeMin") + ) + ); + } + + @Override + public void configure(Binder binder) + { + + } +} diff --git a/extensions-contrib/time-min-max/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/time-min-max/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 000000000000..c69a5a45813a --- /dev/null +++ b/extensions-contrib/time-min-max/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.query.aggregation.TimestampMinMaxModule diff --git a/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampAggregationSelectTest.java b/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampAggregationSelectTest.java new file mode 100644 index 000000000000..33166200385a --- /dev/null +++ b/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampAggregationSelectTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.io.Resources; +import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.query.Result; +import io.druid.query.select.SelectResultValue; +import io.druid.segment.ColumnSelectorFactory; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import java.io.File; +import java.sql.Timestamp; +import java.util.List; +import java.util.zip.ZipFile; + +@RunWith(Parameterized.class) +public class TimestampAggregationSelectTest +{ + private AggregationTestHelper helper; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private ColumnSelectorFactory selectorFactory; + private TestObjectColumnSelector selector; + + private Timestamp[] values = new Timestamp[10]; + + @Parameterized.Parameters(name="{index}: Test for {0}") + public static Iterable constructorFeeder() + { + return Iterables.transform( + ImmutableList.of( + ImmutableList.of("timeMin", "tmin", TimestampMinAggregatorFactory.class, DateTime.parse("2011-01-12T01:00:00.000Z").getMillis()), + ImmutableList.of("timeMax", "tmax", TimestampMaxAggregatorFactory.class, DateTime.parse("2011-01-31T01:00:00.000Z").getMillis()) + ), + new Function, Object[]>() + { + @Nullable + @Override + public Object[] apply(List input) + { + return input.toArray(); + } + } + ); + } + + private String aggType; + private String aggField; + private Class aggClass; + private Long expected; + + public TimestampAggregationSelectTest(String aggType, String aggField, Class aggClass, Long expected) + { + this.aggType = aggType; + this.aggField = aggField; + this.aggClass = aggClass; + this.expected = expected; + } + + @Before + public void setup() throws Exception + { + helper = AggregationTestHelper.createSelectQueryAggregationTestHelper( + new TimestampMinMaxModule().getJacksonModules(), + temporaryFolder + ); + + selector = new TestObjectColumnSelector(values); + selectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); + EasyMock.expect(selectorFactory.makeObjectColumnSelector("test")).andReturn(selector); + EasyMock.replay(selectorFactory); + + } + + @Test + public void testSimpleDataIngestionAndSelectTest() throws Exception + { + String recordParser = "{\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"tsv\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [\n" + + " \"product\"\n" + + " ],\n" + + " \"dimensionExclusions\": [],\n" + + " \"spatialDimensions\": []\n" + + " },\n" + + " \"columns\": [\n" + + " \"timestamp\",\n" + + " \"cat\",\n" + + " \"product\",\n" + + " \"prefer\",\n" + + " \"prefer2\",\n" + + " \"pty_country\"\n" + + " ]\n" + + " }\n" + + "}"; + String aggregator = "[\n" + + " {\n" + + " \"type\": \"" + aggType + "\",\n" + + " \"name\": \"" + aggField + "\",\n" + + " \"fieldName\": \"timestamp\"\n" + + " }\n" + + "]"; + ZipFile zip = new ZipFile(new File(this.getClass().getClassLoader().getResource("druid.sample.tsv.zip").toURI())); + Sequence seq = helper.createIndexAndRunQueryOnSegment( + zip.getInputStream(zip.getEntry("druid.sample.tsv")), + recordParser, + aggregator, + 0, + QueryGranularities.MONTH, + 100, + Resources.toString(Resources.getResource("select.json"), Charsets.UTF_8) + ); + + Result result = (Result) Iterables.getOnlyElement(Sequences.toList(seq, Lists.newArrayList())); + Assert.assertEquals(36, result.getValue().getEvents().size()); + Assert.assertEquals(expected, result.getValue().getEvents().get(0).getEvent().get(aggField)); + } +} diff --git a/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampGroupByAggregationTest.java b/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampGroupByAggregationTest.java new file mode 100644 index 000000000000..c7a76c0f510e --- /dev/null +++ b/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampGroupByAggregationTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import io.druid.data.input.MapBasedRow; +import io.druid.data.input.Row; +import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; +import io.druid.segment.ColumnSelectorFactory; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import java.io.File; +import java.sql.Timestamp; +import java.util.List; +import java.util.zip.ZipFile; + +@RunWith(Parameterized.class) +public class TimestampGroupByAggregationTest +{ + private AggregationTestHelper helper; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private ColumnSelectorFactory selectorFactory; + private TestObjectColumnSelector selector; + + private Timestamp[] values = new Timestamp[10]; + + @Parameterized.Parameters(name="{index}: Test for {0}") + public static Iterable constructorFeeder() + { + return Iterables.transform( + ImmutableList.of( + ImmutableList.of("timeMin", "tmin", "time_min", TimestampMinAggregatorFactory.class, DateTime.parse("2011-01-12T01:00:00.000Z")), + ImmutableList.of("timeMax", "tmax", "time_max", TimestampMaxAggregatorFactory.class, DateTime.parse("2011-01-31T01:00:00.000Z")) + ), + new Function, Object[]>() + { + @Nullable + @Override + public Object[] apply(List input) + { + return input.toArray(); + } + } + ); + } + + private String aggType; + private String aggField; + private String groupByField; + private Class aggClass; + private DateTime expected; + + public TimestampGroupByAggregationTest(String aggType, String aggField, String groupByField, Class aggClass, DateTime expected) + { + this.aggType = aggType; + this.aggField = aggField; + this.groupByField = groupByField; + this.aggClass = aggClass; + this.expected = expected; + } + + @Before + public void setup() throws Exception + { + helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + new TimestampMinMaxModule().getJacksonModules(), + temporaryFolder + ); + + selector = new TestObjectColumnSelector(values); + selectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); + EasyMock.expect(selectorFactory.makeObjectColumnSelector("test")).andReturn(selector); + EasyMock.replay(selectorFactory); + + } + + @Test + public void testSimpleDataIngestionAndGroupByTest() throws Exception + { + String recordParser = "{\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"tsv\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [\n" + + " \"product\"\n" + + " ],\n" + + " \"dimensionExclusions\": [],\n" + + " \"spatialDimensions\": []\n" + + " },\n" + + " \"columns\": [\n" + + " \"timestamp\",\n" + + " \"cat\",\n" + + " \"product\",\n" + + " \"prefer\",\n" + + " \"prefer2\",\n" + + " \"pty_country\"\n" + + " ]\n" + + " }\n" + + "}"; + String aggregator = "[\n" + + " {\n" + + " \"type\": \"" + aggType + "\",\n" + + " \"name\": \"" + aggField + "\",\n" + + " \"fieldName\": \"timestamp\"\n" + + " }\n" + + "]"; + String groupBy = "{\n" + + " \"queryType\": \"groupBy\",\n" + + " \"dataSource\": \"test_datasource\",\n" + + " \"granularity\": \"MONTH\",\n" + + " \"dimensions\": [\"product\"],\n" + + " \"aggregations\": [\n" + + " {\n" + + " \"type\": \"" + aggType + "\",\n" + + " \"name\": \"" + groupByField + "\",\n" + + " \"fieldName\": \"" + aggField + "\"\n" + + " }\n" + + " ],\n" + + " \"intervals\": [\n" + + " \"2011-01-01T00:00:00.000Z/2011-05-01T00:00:00.000Z\"\n" + + " ]\n" + + "}"; + ZipFile zip = new ZipFile(new File(this.getClass().getClassLoader().getResource("druid.sample.tsv.zip").toURI())); + Sequence seq = helper.createIndexAndRunQueryOnSegment( + zip.getInputStream(zip.getEntry("druid.sample.tsv")), + recordParser, + aggregator, + 0, + QueryGranularities.MONTH, + 100, + groupBy + ); + + List results = Sequences.toList(seq, Lists.newArrayList()); + Assert.assertEquals(36, results.size()); + Assert.assertEquals(expected, ((MapBasedRow)results.get(0)).getEvent().get(groupByField)); + } +} diff --git a/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampMinMaxAggregatorTest.java b/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampMinMaxAggregatorTest.java new file mode 100644 index 000000000000..00823aaafc07 --- /dev/null +++ b/extensions-contrib/time-min-max/src/test/java/io/druid/query/aggregation/TimestampMinMaxAggregatorTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.primitives.Longs; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import io.druid.guice.GuiceInjectors; +import io.druid.initialization.Initialization; +import io.druid.segment.ColumnSelectorFactory; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.util.List; + +@RunWith(Parameterized.class) +public class TimestampMinMaxAggregatorTest +{ + Injector injector; + ObjectMapper mapper; + + private TimestampAggregatorFactory aggregatorFactory; + private ColumnSelectorFactory selectorFactory; + private TestObjectColumnSelector selector; + + private Timestamp[] values = { + Timestamp.valueOf("2014-01-02 11:00:00"), + Timestamp.valueOf("2014-01-02 01:00:00"), + Timestamp.valueOf("2014-01-02 05:00:00"), + Timestamp.valueOf("2014-01-02 12:00:00"), + Timestamp.valueOf("2014-01-02 12:00:00"), + Timestamp.valueOf("2014-01-02 13:00:00"), + Timestamp.valueOf("2014-01-02 06:00:00"), + Timestamp.valueOf("2014-01-02 17:00:00"), + Timestamp.valueOf("2014-01-02 12:00:00"), + Timestamp.valueOf("2014-01-02 02:00:00") + }; + + @Parameterized.Parameters(name="{index}: Test for {0}") + public static Iterable constructorFeeder() + { + return Iterables.transform( + ImmutableList.of( + ImmutableList.of("timeMin", TimestampMinAggregatorFactory.class, Long.MAX_VALUE, Timestamp.valueOf("2014-01-02 01:00:00")), + ImmutableList.of("timeMax", TimestampMaxAggregatorFactory.class, Long.MIN_VALUE, Timestamp.valueOf("2014-01-02 17:00:00")) + ), + new Function, Object[]>() + { + @Nullable + @Override + public Object[] apply(List input) + { + return input.toArray(); + } + } + ); + } + + private String aggType; + private Class aggClass; + private Long initValue; + private Timestamp expected; + + public TimestampMinMaxAggregatorTest(String aggType, Class aggClass, Long initValue, Timestamp expected) + { + this.aggType = aggType; + this.aggClass = aggClass; + this.expected = expected; + this.initValue = initValue; + } + + @Before + public void setup() throws Exception + { + injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + } + }, + new TimestampMinMaxModule() + ) + ); + mapper = injector.getInstance(ObjectMapper.class); + + String json = "{\"type\":\"" + aggType + "\",\"name\":\"" + aggType + "\",\"fieldName\":\"test\"}"; + + aggregatorFactory = mapper.readValue(json, aggClass); + selector = new TestObjectColumnSelector(values); + selectorFactory = EasyMock.createMock(ColumnSelectorFactory.class); + EasyMock.expect(selectorFactory.makeObjectColumnSelector("test")).andReturn(selector); + EasyMock.replay(selectorFactory); + } + + @Test + public void testAggregator() + { + TimestampAggregator aggregator = (TimestampAggregator) aggregatorFactory.factorize(selectorFactory); + + Assert.assertEquals(aggType, aggregator.getName()); + + for (Timestamp value: values) { + aggregate(selector, aggregator); + } + + Assert.assertEquals(expected, new Timestamp(aggregator.getLong())); + + aggregator.reset(); + + Assert.assertEquals(initValue, aggregator.get()); + } + + @Test + public void testBufferAggregator() + { + TimestampBufferAggregator aggregator = (TimestampBufferAggregator) aggregatorFactory.factorizeBuffered(selectorFactory); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[Longs.BYTES]); + aggregator.init(buffer, 0); + + for (Timestamp value: values) { + aggregate(selector, aggregator, buffer, 0); + } + + Assert.assertEquals(expected, new Timestamp(aggregator.getLong(buffer, 0))); + + aggregator.init(buffer, 0); + + Assert.assertEquals(initValue, aggregator.get(buffer, 0)); + } + + private void aggregate(TestObjectColumnSelector selector, TimestampAggregator agg) + { + agg.aggregate(); + selector.increment(); + } + + private void aggregate(TestObjectColumnSelector selector, TimestampBufferAggregator agg, ByteBuffer buf, int pos) + { + agg.aggregate(buf, pos); + selector.increment(); + } +} diff --git a/extensions-contrib/time-min-max/src/test/resources/druid.sample.tsv.zip b/extensions-contrib/time-min-max/src/test/resources/druid.sample.tsv.zip new file mode 100644 index 000000000000..36fa4671c7c7 Binary files /dev/null and b/extensions-contrib/time-min-max/src/test/resources/druid.sample.tsv.zip differ diff --git a/extensions-contrib/time-min-max/src/test/resources/select.json b/extensions-contrib/time-min-max/src/test/resources/select.json new file mode 100644 index 000000000000..95547819aae0 --- /dev/null +++ b/extensions-contrib/time-min-max/src/test/resources/select.json @@ -0,0 +1,11 @@ +{ + "queryType": "select", + "dataSource": "test_datasource", + "dimensions":[], + "metrics":[], + "granularity": "ALL", + "intervals": [ + "2011-01-01T00:00:00.000Z/2011-05-01T00:00:00.000Z" + ], + "pagingSpec":{"pagingIdentifiers": {}, "threshold":100} +} diff --git a/pom.xml b/pom.xml index 10d2e5f6b3bc..15dc4fcb6c43 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,7 @@ extensions-contrib/parquet-extensions extensions-contrib/statsd-emitter extensions-contrib/orc-extensions + extensions-contrib/time-min-max diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 49aab7b50a16..0933f554bb13 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -20,6 +20,7 @@ package io.druid.segment.incremental; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Enums; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; @@ -172,13 +173,10 @@ public Object get() } }; - if (!deserializeComplexMetrics) { + if ((Enums.getIfPresent(ValueType.class, typeName.toUpperCase()).isPresent() && !typeName.equalsIgnoreCase(ValueType.COMPLEX.name())) + || !deserializeComplexMetrics) { return rawColumnSelector; } else { - if (typeName.equals("float")) { - return rawColumnSelector; - } - final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); if (serde == null) { throw new ISE("Don't know how to handle type[%s]", typeName);