From 30288edeebb7237f5a69f26e434b866e3c077f43 Mon Sep 17 00:00:00 2001 From: tallong Date: Thu, 16 Mar 2017 04:48:20 +0800 Subject: [PATCH 1/5] tupleDoubleArraySketch --- .../doublearray/SketchAggregatorFactory.java | 164 +++++++++ .../SketchBuildComplexMetricSerde.java | 53 +++ .../doublearray/SketchJsonSerializer.java | 43 +++ .../SketchMergeAggregatorFactory.java | 330 ++++++++++++++++++ .../SketchMergeComplexMetricSerde.java | 89 +++++ .../tuple/doublearray/SketchModule.java | 77 ++++ .../doublearray/SketchObjectStrategy.java | 109 ++++++ .../tuple/doublearray/SketchOperations.java | 84 +++++ .../aggregator/EmptySketchAggregator.java | 72 ++++ .../aggregator/SketchAggregator.java | 153 ++++++++ .../SketchIntersectionAggregator.java | 79 +++++ .../aggregator/SketchMaxAggregator.java | 60 ++++ .../aggregator/SketchMinAggregator.java | 60 ++++ .../aggregator/SketchUnionAggregator.java | 75 ++++ .../EmptySketchBufferAggregator.java | 70 ++++ .../SketchBufferAggregator.java | 113 ++++++ .../SketchIntersectionBufferAggregator.java | 90 +++++ .../SketchMaxBufferAggregator.java | 67 ++++ .../SketchMinBufferAggregator.java | 67 ++++ .../SketchUnionBufferAggregator.java | 97 +++++ .../io.druid.initialization.DruidModule | 1 + 21 files changed, 1953 insertions(+) create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchAggregatorFactory.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchBuildComplexMetricSerde.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchJsonSerializer.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeComplexMetricSerde.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchObjectStrategy.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchOperations.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/EmptySketchAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchIntersectionAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMaxAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMinAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchUnionAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/EmptySketchBufferAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchBufferAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchIntersectionBufferAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMaxBufferAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMinBufferAggregator.java create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchUnionBufferAggregator.java diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchAggregatorFactory.java new file mode 100644 index 000000000000..452dbd6aa938 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchAggregatorFactory.java @@ -0,0 +1,164 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Ints; +import com.yahoo.sketches.Util; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; + +import io.druid.query.aggregation.AggregatorFactory; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public abstract class SketchAggregatorFactory extends AggregatorFactory { + public static final int DEFAULT_MAX_SKETCH_SIZE = 16384; + + protected final String name; + protected final String fieldName; + protected final List fieldNames; + protected final int size; + private final byte cacheId; + + public static final Comparator COMPARATOR = new Comparator() { + @Override + public int compare(ArrayOfDoublesSketch o, ArrayOfDoublesSketch o1) { + return Doubles.compare(o.getEstimate(), o1.getEstimate()); + } + }; + + public SketchAggregatorFactory(String name, String fieldName,List fieldNames, Integer size, byte cacheId) { + this.name = Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + this.fieldName = Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + this.fieldNames = Preconditions.checkNotNull(fieldNames, "Must have a valid, non-null fieldNames"); + + this.size = size == null ? DEFAULT_MAX_SKETCH_SIZE : size; + Util.checkIfPowerOf2(this.size, "size"); + + this.cacheId = cacheId; + } + + + + @Override + public Object deserialize(Object object) { + return SketchOperations.deserialize(object); + } + + @Override + public Comparator getComparator() { + return COMPARATOR; + } + + + @Override + @JsonProperty + public String getName() { + return name; + } + + @JsonProperty + public String getFieldName(){ + return fieldName; + } + + @JsonProperty + public List getFieldNames() { + return fieldNames; + } + + @JsonProperty + public int getSize() { + return size; + } + + + @Override + public List requiredFields() { + return Collections.singletonList(fieldName); + } + + @Override + public byte[] getCacheKey() { + byte[] fieldNameBytes = fieldName.getBytes(); + return ByteBuffer.allocate(1 + Ints.BYTES + fieldNameBytes.length) + .put(cacheId) + .putInt(size) + .put(fieldNameBytes) + .array(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + + "fieldName='" + fieldName + '\'' + + ", fieldNames='" + fieldNames + '\'' + + ", name='" + name + '\'' + + ", size=" + size + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SketchAggregatorFactory that = (SketchAggregatorFactory) o; + + if (size != that.size) { + return false; + } + if (cacheId != that.cacheId) { + return false; + } + if (!name.equals(that.name)) { + return false; + } + if(!fieldName.equals(that.fieldName)){ + return false; + } + return fieldNames.equals(that.fieldNames); + + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + fieldName.hashCode(); + result = 31 * result + fieldNames.hashCode(); + result = 31 * result + size; + result = 31 * result + (int) cacheId; + return result; + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchBuildComplexMetricSerde.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchBuildComplexMetricSerde.java new file mode 100644 index 000000000000..6a3fcc71740d --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchBuildComplexMetricSerde.java @@ -0,0 +1,53 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import io.druid.data.input.InputRow; +import io.druid.segment.serde.ComplexMetricExtractor; + + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class SketchBuildComplexMetricSerde extends SketchMergeComplexMetricSerde +{ + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + + @Override + public Class extractedClass() + { + return Object.class; + } + + @Override + public Object extractValue(InputRow inputRow, String metricName) { + + return inputRow.getRaw(metricName); + } + }; + } + +} \ No newline at end of file diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchJsonSerializer.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchJsonSerializer.java new file mode 100644 index 000000000000..7e31c2bacbbe --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchJsonSerializer.java @@ -0,0 +1,43 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.io.IOException; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class SketchJsonSerializer extends JsonSerializer +{ + @Override + public void serialize(ArrayOfDoublesSketch sketch, JsonGenerator jgen, SerializerProvider provider) + throws IOException + { + jgen.writeBinary(sketch.toByteArray()); + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java new file mode 100644 index 000000000000..ce04e28ff061 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java @@ -0,0 +1,330 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.IAE; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketchIterator; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.EmptySketchAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchMaxAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchMinAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchUnionAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator.EmptySketchBufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator.SketchMaxBufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator.SketchMinBufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator.SketchUnionBufferAggregator; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class SketchMergeAggregatorFactory extends SketchAggregatorFactory { + private static final byte CACHE_TYPE_ID = 15; + + private final boolean shouldFinalize; + private final boolean isInputTupleSketch; + private final int valuesCount; + private final String valuesFunction; + + private final String FUNC_SUM = "sum"; + private final String FUNC_MAX = "max"; + private final String FUNC_MIN = "min"; + + @JsonCreator + public SketchMergeAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("fieldNames") List fieldNames, + @JsonProperty("size") Integer size, + @JsonProperty("shouldFinalize") Boolean shouldFinalize, + @JsonProperty("isInputTupleSketch") Boolean isInputTupleSketch, + @JsonProperty("valuesCount") int valuesCount, + @JsonProperty("valuesFunction") String valuesFunction){ + super(name,fieldName, fieldNames, size, CACHE_TYPE_ID); + this.shouldFinalize = (shouldFinalize == null) ? true : shouldFinalize.booleanValue(); + this.isInputTupleSketch = (isInputTupleSketch == null) ? false : isInputTupleSketch.booleanValue(); + this.valuesCount=valuesCount; + this.valuesFunction = valuesFunction; + } + + @SuppressWarnings({"rawtypes" }) + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) { + ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); + List selectors = new ArrayList<>(); + for(String fieldName : fieldNames){ + FloatColumnSelector s = metricFactory.makeFloatColumnSelector(fieldName); + selectors.add(s); + } + + if (selector==null || selectors.isEmpty()) { + return new EmptySketchAggregator(name); + } + + switch(this.valuesFunction){ + case FUNC_SUM: + return new SketchUnionAggregator(name, selector,selectors, size,valuesCount); + case FUNC_MAX: + return new SketchMaxAggregator(name, selector, selectors, size, valuesCount); + case FUNC_MIN: + return new SketchMinAggregator(name, selector, selectors, size, valuesCount); + default: + throw new IAE("not support this function "+this.valuesFunction); + } + } + + @SuppressWarnings("rawtypes") + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { + ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); + List selectors = new ArrayList<>(); + for(String fieldName : fieldNames){ + FloatColumnSelector s = metricFactory.makeFloatColumnSelector(fieldName); + selectors.add(s); + } + if (selector==null || selectors.isEmpty()) { + return new EmptySketchBufferAggregator(); + } + switch(this.valuesFunction){ + case FUNC_SUM: + return new SketchUnionBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); + case FUNC_MAX: + return new SketchMaxBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); + case FUNC_MIN: + return new SketchMinBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); + default: + throw new IAE("not support this function "+this.valuesFunction); + } + } + + + @Override + public int getMaxIntermediateSize() { + return ArrayOfDoublesUnion.getMaxBytes(size, valuesCount); + } + + @Override + public List getRequiredColumns() { + return Collections.singletonList( + new SketchMergeAggregatorFactory( + name, + fieldName, + fieldNames, + size, + shouldFinalize, + isInputTupleSketch, + valuesCount, + valuesFunction + ) + ); + } + + + @Override + public AggregatorFactory getCombiningFactory() { + return new SketchMergeAggregatorFactory(name, fieldName,fieldNames, size, shouldFinalize, false,valuesCount,valuesFunction); + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException { + if (other.getName().equals(this.getName()) && other instanceof SketchMergeAggregatorFactory) { + SketchMergeAggregatorFactory castedOther = (SketchMergeAggregatorFactory) other; + + return new SketchMergeAggregatorFactory( + name, + fieldName, + fieldNames, + Math.max(size, castedOther.size), + shouldFinalize, + true,valuesCount,valuesFunction + ); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @JsonProperty + public boolean getShouldFinalize() { + return shouldFinalize; + } + + @JsonProperty + public boolean getIsInputTupleSketch() { + return isInputTupleSketch; + } + + @JsonProperty + public int getValuesCount(){ + return valuesCount; + } + + @JsonProperty + public String getValuesFunction(){ + return valuesFunction; + } + + @Override + public Object combine(Object lhs, Object rhs) { + switch(this.valuesFunction){ + case FUNC_SUM:{ + ArrayOfDoublesUnion union = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildUnion(); + SketchAggregator.updateUnion(union, lhs); + SketchAggregator.updateUnion(union, rhs); + return union.getResult(); + } + case FUNC_MAX:{ + ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildIntersection(); + SketchAggregator.updateIntersection(intersection, lhs,SketchAggregator.max); + SketchAggregator.updateIntersection(intersection, rhs,SketchAggregator.max); + return intersection.getResult(); + } + case FUNC_MIN:{ + ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildIntersection(); + SketchAggregator.updateIntersection(intersection, lhs,SketchAggregator.min); + SketchAggregator.updateIntersection(intersection, rhs,SketchAggregator.min); + return intersection.getResult(); + } + default: + throw new IAE("not support this function "+this.valuesFunction); + } + + } + + /** + * Finalize the computation on sketch object and returns estimate from underlying + * sketch. + * + * @param object the sketch object + * @return sketch object + */ + @Override + public Object finalizeComputation(Object object) { + if (shouldFinalize) { + if(object instanceof ArrayOfDoublesSketch){ + ArrayOfDoublesSketch tuple=(ArrayOfDoublesSketch)object; + + double theta=tuple.getTheta(); + ArrayOfDoublesSketchIterator it = tuple.iterator(); + StringBuilder sb = new StringBuilder(); + sb.append("{"); + while(it.next()){ + sb.append("["); + for(double v : it.getValues()){ + long est = Math.round(v/theta); + sb.append(est+","); + } + if(sb.charAt(sb.length()-1) == ','){ + sb.deleteCharAt(sb.length()-1); + } + sb.append("],"); + } + if(sb.charAt(sb.length()-1) == ','){ + sb.deleteCharAt(sb.length()-1); + } + sb.append("}"); + return String.format("{\"total\":%s,\"detail\":%s}",tuple.getEstimate(),sb.toString()); + } + throw new IAE("not support this class "+object.getClass()); + } else { + return object; + } + } + + @Override + public String getTypeName() { + if (isInputTupleSketch) { + return SketchModule.TUPLE_DOUBLE_SKETCH_MERGE_AGG; + } else { + return SketchModule.TUPLE_DOUBLE_SKETCH_BUILD_AGG; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory) o; + + if (shouldFinalize != that.shouldFinalize) { + return false; + } + if (valuesCount != that.valuesCount) { + return false; + } + if (!valuesFunction.equals(that.valuesFunction)){ + return false; + } + return isInputTupleSketch == that.isInputTupleSketch; + + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (shouldFinalize ? 1 : 0); + result = 31 * result + (isInputTupleSketch ? 1 : 0); + result = 31 * result + valuesCount; + result = 31 * result + valuesFunction.hashCode(); + return result; + } + + @Override + public String toString() { + return "SketchMergeAggregatorFactory{" + + "fieldName=" + fieldName + + ", fieldNames=" + fieldNames + + ", name=" + name + + ", size=" + size + + ", shouldFinalize=" + shouldFinalize + + ", isInputTupleSketch=" + isInputTupleSketch + + ", valuesCount="+valuesCount + + ", valuesFunction="+valuesFunction + + "}"; + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeComplexMetricSerde.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeComplexMetricSerde.java new file mode 100644 index 000000000000..3150cc8280c3 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeComplexMetricSerde.java @@ -0,0 +1,89 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.nio.ByteBuffer; + +import com.yahoo.memory.Memory; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; + +import io.druid.data.input.InputRow; +import io.druid.segment.column.ColumnBuilder; +import io.druid.segment.data.GenericIndexed; +import io.druid.segment.data.ObjectStrategy; +import io.druid.segment.serde.ComplexColumnPartSupplier; +import io.druid.segment.serde.ComplexMetricExtractor; +import io.druid.segment.serde.ComplexMetricSerde; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings("unchecked") +public class SketchMergeComplexMetricSerde extends ComplexMetricSerde +{ + private SketchObjectStrategy strategy = new SketchObjectStrategy(); + + @Override + public String getTypeName() + { + return SketchModule.TUPLE_DOUBLE_SKETCH; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + @Override + public Class extractedClass() + { + return ArrayOfDoublesSketch.class; + } + + @Override + public Object extractValue(InputRow inputRow, String metricName) + { + final Object object = inputRow.getRaw(metricName); + if (object == null || object instanceof ArrayOfDoublesSketch || object instanceof Memory) { + return object; + } + return SketchOperations.deserialize(object); + + } + }; + } + + + @Override + public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder) + { + GenericIndexed ge = GenericIndexed.read(buffer, strategy); + builder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), ge)); + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return strategy; + } + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java new file mode 100644 index 000000000000..7c23b0eb5af7 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java @@ -0,0 +1,77 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * DOUBLEd with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software DOUBLEd under the License is DOUBLEd on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.util.Arrays; +import java.util.List; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; + +import io.druid.initialization.DruidModule; +import io.druid.segment.serde.ComplexMetrics; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class SketchModule implements DruidModule +{ + public static final String TUPLE_DOUBLE_SKETCH = "tupleDoubleArraySketch"; + public static final String TUPLE_DOUBLE_SKETCH_MERGE_AGG = "tupleDoubleArraySketchMerge"; + public static final String TUPLE_DOUBLE_SKETCH_BUILD_AGG = "tupleDoubleArraySketchBuild"; + + + @Override + public void configure(Binder binder) + { + if (ComplexMetrics.getSerdeForType(TUPLE_DOUBLE_SKETCH) == null) { + ComplexMetrics.registerSerde(TUPLE_DOUBLE_SKETCH, new SketchMergeComplexMetricSerde()); + } + + if (ComplexMetrics.getSerdeForType(TUPLE_DOUBLE_SKETCH_MERGE_AGG) == null) { + ComplexMetrics.registerSerde(TUPLE_DOUBLE_SKETCH_MERGE_AGG, new SketchMergeComplexMetricSerde()); + } + + if (ComplexMetrics.getSerdeForType(TUPLE_DOUBLE_SKETCH_BUILD_AGG) == null) { + ComplexMetrics.registerSerde(TUPLE_DOUBLE_SKETCH_BUILD_AGG, new SketchBuildComplexMetricSerde()); + } + } + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("TupleDoubleArraySketchModule") + .registerSubtypes( + new NamedType(SketchMergeAggregatorFactory.class, TUPLE_DOUBLE_SKETCH) + ) + .addSerializer( + ArrayOfDoublesSketch.class , new SketchJsonSerializer() + ) + ); + } + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchObjectStrategy.java new file mode 100644 index 000000000000..d218afaf5c8a --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchObjectStrategy.java @@ -0,0 +1,109 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.nio.ByteBuffer; + +import com.google.common.primitives.Longs; +import com.metamx.common.IAE; +import com.yahoo.memory.Memory; +import com.yahoo.memory.MemoryRegion; +import com.yahoo.memory.NativeMemory; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; + +import io.druid.segment.data.ObjectStrategy; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings("rawtypes") +public class SketchObjectStrategy implements ObjectStrategy +{ + + private static final byte[] EMPTY_BYTES = new byte[]{}; + private static final ArrayOfDoublesSketch EMPTY_SKETCH = SketchOperations.EMPTY_SKETCH; + + @Override + public int compare(Object s1, Object s2) + { + if (s1 instanceof ArrayOfDoublesSketch) { + if (s2 instanceof ArrayOfDoublesSketch) { + return SketchAggregatorFactory.COMPARATOR.compare((ArrayOfDoublesSketch) s1, (ArrayOfDoublesSketch) s2); + } else { + return -1; + } + } + if (s1 instanceof Memory) { + if (s2 instanceof Memory) { + Memory s1Mem = (Memory) s1; + Memory s2Mem = (Memory) s2; + + // We have two Ordered Compact sketches, so just compare their last entry if they have the size. + // This is to produce a deterministic ordering, though it might not match the actual estimate + // ordering, but that's ok because this comparator is only used by GenericIndexed + int retVal = Longs.compare(s1Mem.getCapacity(), s2Mem.getCapacity()); + if (retVal == 0) { + retVal = Longs.compare(s1Mem.getLong(s1Mem.getCapacity() - 8), s2Mem.getLong(s2Mem.getCapacity() - 8)); + } + + return retVal; + } else { + return 1; + } + } + throw new IAE("Unknwon class[%s], toString[%s]", s1.getClass(), s1); + + } + + @Override + public Class getClazz() + { + return ArrayOfDoublesSketch.class; + } + + @Override + public Object fromByteBuffer(ByteBuffer buffer, int numBytes) + { + if (numBytes == 0) { + return EMPTY_SKETCH; + } + + return new MemoryRegion(new NativeMemory(buffer), buffer.position(), numBytes); + } + + @Override + public byte[] toBytes(Object obj) + { + if (obj instanceof ArrayOfDoublesSketch) { + return ((ArrayOfDoublesSketch) obj).toByteArray(); + } else if (obj instanceof Memory) { + Memory mem = (Memory) obj; + byte[] retVal = new byte[(int) mem.getCapacity()]; + mem.getByteArray(0, retVal, 0, (int) mem.getCapacity()); + return retVal; + } else if (obj == null) { + return EMPTY_BYTES; + } else { + throw new IAE("Unknown class[%s], toString[%s]", obj.getClass(), obj); + } + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchOperations.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchOperations.java new file mode 100644 index 000000000000..e3199bcdc6e8 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchOperations.java @@ -0,0 +1,84 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import org.apache.commons.codec.binary.Base64; + +import com.google.common.base.Charsets; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.memory.Memory; +import com.yahoo.memory.NativeMemory; +import com.yahoo.sketches.theta.Sketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketches; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class SketchOperations { + + public static final ArrayOfDoublesSketch EMPTY_SKETCH = new ArrayOfDoublesUpdatableSketchBuilder() + .setNominalEntries(16384).setResizeFactor(ResizeFactor.X2).setNumberOfValues(1).build(); + + public static ArrayOfDoublesUpdatableSketch createSketch(int size, int valuesCount, Object key, Object values) { + ArrayOfDoublesUpdatableSketch sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(size) + .setResizeFactor(ResizeFactor.X2).setNumberOfValues(valuesCount).build(); + String KeyStr = (String) key; + double[] valuesAry = (double[]) values; + double[] finalValuesAry = new double[valuesCount]; + System.arraycopy(valuesAry, 0, finalValuesAry, 0, valuesAry.length); + sketch.update(KeyStr, finalValuesAry); + return sketch; + } + + public static ArrayOfDoublesSketch deserialize(Object serializedSketch) { + if (serializedSketch instanceof String) { + return deserializeFromBase64EncodedString((String) serializedSketch); + } else if (serializedSketch instanceof byte[]) { + return deserializeFromByteArray((byte[]) serializedSketch); + } else if (serializedSketch instanceof ArrayOfDoublesSketch) { + return (ArrayOfDoublesSketch) serializedSketch; + } + + throw new IllegalStateException( + "Object is not of a type that can deserialize to sketch: " + serializedSketch.getClass()); + } + + public static ArrayOfDoublesSketch deserializeFromBase64EncodedString(String str) { + return deserializeFromByteArray(Base64.decodeBase64(str.getBytes(Charsets.UTF_8))); + } + + public static ArrayOfDoublesSketch deserializeFromByteArray(byte[] data) { + return deserializeFromMemory(new NativeMemory(data)); + } + + public static ArrayOfDoublesSketch deserializeFromMemory(Memory mem) { + if (Sketch.getSerializationVersion(mem) < 3) { + return ArrayOfDoublesSketches.heapifySketch(mem); + } else { + return ArrayOfDoublesSketches.wrapSketch(mem); + } + } + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/EmptySketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/EmptySketchAggregator.java new file mode 100644 index 000000000000..77b4a253d5b1 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/EmptySketchAggregator.java @@ -0,0 +1,72 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator; + +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.SketchOperations; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class EmptySketchAggregator implements Aggregator +{ + private final String name; + + public EmptySketchAggregator(String name) + { + this.name = name; + } + + @Override + public void aggregate() + { + } + + @Override + public void reset() + { + } + + @Override + public Object get() + { + return SketchOperations.EMPTY_SKETCH; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("Not implemented"); + } + + + @Override + public void close() + { + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchAggregator.java new file mode 100644 index 000000000000..44511fc65dda --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchAggregator.java @@ -0,0 +1,153 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator; + +import java.util.List; + +import com.metamx.common.IAE; +import com.metamx.common.logger.Logger; +import com.yahoo.memory.Memory; +import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; +import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; + +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.SketchOperations; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({"rawtypes"}) +public abstract class SketchAggregator implements Aggregator { + + protected static final Logger logger = new Logger(SketchAggregator.class); + + protected final List selectors; + protected final ObjectColumnSelector selector; + protected final String name; + protected final int size; + protected final int valuesCount; + + + public SketchAggregator(String name,ObjectColumnSelector selector,List selectors,int size,int valuesCount) { + this.name = name; + this.selector = selector; + this.selectors = selectors; + this.size = size; + this.valuesCount = valuesCount; + } + + @Override + public void aggregate() { + Object key = selector.get(); + double[] values = new double[selectors.size()]; + for(int i=0;i selectors,int size,int valuesCount) { + super(name, selector, selectors, size, valuesCount); + + } + + @Override + public void update(Object key) { + ArrayOfDoublesSketch sketch = parseSketch(key); + + ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildIntersection(); + intersection.update(sketch, getCombiner()); + intersection.update(union.getResult(), getCombiner()); + + ArrayOfDoublesAnotB anotb = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); + anotb.update(union.getResult(), sketch); + + ArrayOfDoublesAnotB bnota = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); + bnota.update(sketch,union.getResult()); + + union.reset(); + union.update(intersection.getResult()); + union.update(anotb.getResult()); + union.update(bnota.getResult()); + + } + + + public abstract ArrayOfDoublesCombiner getCombiner(); + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMaxAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMaxAggregator.java new file mode 100644 index 000000000000..c934c758a265 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMaxAggregator.java @@ -0,0 +1,60 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator; + +import java.util.List; + +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.memory.Memory; +import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({"rawtypes","unused"}) +public class SketchMaxAggregator extends SketchIntersectionAggregator { + + public SketchMaxAggregator(String name, ObjectColumnSelector selector, List selectors, + int size, int valuesCount) { + super(name, selector, selectors, size, valuesCount); + } + + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.max; + } + + + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMinAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMinAggregator.java new file mode 100644 index 000000000000..11ceebe04e7c --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMinAggregator.java @@ -0,0 +1,60 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator; + +import java.util.List; + +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.memory.Memory; +import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({"rawtypes","unused"}) +public class SketchMinAggregator extends SketchIntersectionAggregator { + + public SketchMinAggregator(String name, ObjectColumnSelector selector, List selectors, + int size, int valuesCount) { + super(name, selector, selectors, size, valuesCount); + } + + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.min; + } + + + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchUnionAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchUnionAggregator.java new file mode 100644 index 000000000000..092a005040f4 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchUnionAggregator.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator; + +import java.util.List; + +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.memory.Memory; +import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({"rawtypes","unused"}) +public class SketchUnionAggregator extends SketchAggregator { + + protected ArrayOfDoublesUnion union; + + public SketchUnionAggregator(String name,ObjectColumnSelector selector,List selectors,int size,int valuesCount) { + super(name, selector, selectors, size, valuesCount); + this.union = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildUnion(); + } + + @Override + public void update(Object key) { + updateUnion(union, key); + } + + @Override + public void reset() { + union.reset(); + } + + @Override + public Object get() { + return union.getResult(); + } + + @Override + public void close() { + union = null; + } + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/EmptySketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/EmptySketchBufferAggregator.java new file mode 100644 index 000000000000..c28d4479f4fe --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/EmptySketchBufferAggregator.java @@ -0,0 +1,70 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.SketchOperations; + +import java.nio.ByteBuffer; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class EmptySketchBufferAggregator implements BufferAggregator +{ + public EmptySketchBufferAggregator() + { + } + + @Override + public void init(ByteBuffer buf, int position) + { + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return SketchOperations.EMPTY_SKETCH; + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchBufferAggregator.java new file mode 100644 index 000000000000..11e31dcc1ee9 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchBufferAggregator.java @@ -0,0 +1,113 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.memory.Memory; +import com.yahoo.memory.MemoryRegion; +import com.yahoo.memory.NativeMemory; +import com.yahoo.sketches.theta.SetOperation; +import com.yahoo.sketches.theta.Union; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.SketchOperations; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({ "rawtypes", "unused" }) +public abstract class SketchBufferAggregator implements BufferAggregator { + protected static final Logger logger = new Logger(SketchAggregator.class); + + protected final ObjectColumnSelector selector; + protected final List selectors; + protected final int size; + protected final int valuesCount; + protected final int maxIntermediateSize; + + protected NativeMemory nm; + + public SketchBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount,int maxIntermediateSize) { + this.selector = selector; + this.selectors = selectors; + this.size = size; + this.valuesCount = valuesCount; + this.maxIntermediateSize = maxIntermediateSize; + } + + @Override + public void init(ByteBuffer buf, int position) { + if (nm == null) { + nm = new NativeMemory(buf); + } + Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); + register(position, mem); + } + + public abstract void register(int position,Memory mem); + + @Override + public void aggregate(ByteBuffer buf, int position) { + Object key = selector.get(); + double[] values = new double[selectors.size()]; + for(int i=0;i selectors, int size, + int valuesCount,int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } + + + @Override + public void update(ByteBuffer buf,int position,Object key) { + ArrayOfDoublesSketch sketch = SketchAggregator.parseSketch(key); + + ArrayOfDoublesUnion union = unions.get(position); + + ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildIntersection(); + intersection.update(sketch, getCombiner()); + intersection.update(union.getResult(), getCombiner()); + + ArrayOfDoublesAnotB anotb = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); + anotb.update(union.getResult(), sketch); + + ArrayOfDoublesAnotB bnota = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); + bnota.update(sketch,union.getResult()); + + union.reset(); + union.update(intersection.getResult()); + union.update(anotb.getResult()); + union.update(bnota.getResult()); + + } + + public abstract ArrayOfDoublesCombiner getCombiner(); + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMaxBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMaxBufferAggregator.java new file mode 100644 index 000000000000..5ad403fd5d2c --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMaxBufferAggregator.java @@ -0,0 +1,67 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.memory.Memory; +import com.yahoo.memory.MemoryRegion; +import com.yahoo.memory.NativeMemory; +import com.yahoo.sketches.theta.SetOperation; +import com.yahoo.sketches.theta.Union; +import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; +import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({ "rawtypes", "unused" }) +public class SketchMaxBufferAggregator extends SketchIntersectionBufferAggregator{ + + public SketchMaxBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount, int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } + + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.max; + } + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMinBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMinBufferAggregator.java new file mode 100644 index 000000000000..3c31b0f04efd --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMinBufferAggregator.java @@ -0,0 +1,67 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.memory.Memory; +import com.yahoo.memory.MemoryRegion; +import com.yahoo.memory.NativeMemory; +import com.yahoo.sketches.theta.SetOperation; +import com.yahoo.sketches.theta.Union; +import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; +import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({ "rawtypes", "unused" }) +public class SketchMinBufferAggregator extends SketchIntersectionBufferAggregator{ + + public SketchMinBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount, int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } + + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.min; + } + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchUnionBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchUnionBufferAggregator.java new file mode 100644 index 000000000000..1f71a5e4c842 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchUnionBufferAggregator.java @@ -0,0 +1,97 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.memory.Memory; +import com.yahoo.memory.MemoryRegion; +import com.yahoo.memory.NativeMemory; +import com.yahoo.sketches.theta.SetOperation; +import com.yahoo.sketches.theta.Union; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({ "rawtypes", "unused" }) +public class SketchUnionBufferAggregator extends SketchBufferAggregator { + + protected final Map unions = new HashMap<>(); + + public SketchUnionBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount,int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } + + @Override + public void register(int position,Memory mem) { + unions.put(position, new ArrayOfDoublesSetOperationBuilder().setMemory(mem).setNominalEntries(size).setNumberOfValues(valuesCount).buildUnion()); + + } + + @Override + public void update(ByteBuffer buf,int position,Object key) { + SketchAggregator.updateUnion(getUnion(buf,position),key); + } + + + + @Override + public Object get(ByteBuffer buf, int position) { + return getUnion(buf, position).getResult(); + } + + + @Override + public void close() { + unions.clear(); + } + + // Note that this is not threadsafe and I don't think it needs to be + private ArrayOfDoublesUnion getUnion(ByteBuffer buf, int position) { + ArrayOfDoublesUnion union = unions.get(position); + if (union == null) { + Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); + union = new ArrayOfDoublesSetOperationBuilder().setMemory(mem).setNominalEntries(this.size).setNumberOfValues(this.valuesCount).buildUnion(); + unions.put(position, union); + } + return union; + } + +} diff --git a/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule index efd10f3a7e78..dd49ec708d3a 100644 --- a/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +++ b/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -1,2 +1,3 @@ io.druid.query.aggregation.datasketches.theta.SketchModule io.druid.query.aggregation.datasketches.theta.oldapi.OldApiSketchModule +io.druid.query.aggregation.datasketches.tuple.doublearray.SketchModule From ef13ed4fade0e31451c1948109690443ed124964 Mon Sep 17 00:00:00 2001 From: tallong Date: Mon, 20 Mar 2017 10:24:48 +0800 Subject: [PATCH 2/5] unused import && tab charactor --- .../doublearray/SketchAggregatorFactory.java | 4 +- .../SketchBuildComplexMetricSerde.java | 2 +- .../doublearray/SketchJsonSerializer.java | 1 - .../SketchMergeAggregatorFactory.java | 82 +++++++------ .../tuple/doublearray/SketchModule.java | 4 +- .../tuple/doublearray/SketchOperations.java | 72 +++++------ .../aggregator/SketchAggregator.java | 78 ++++++------ .../SketchIntersectionAggregator.java | 59 ++++----- .../aggregator/SketchMaxAggregator.java | 27 ++--- .../aggregator/SketchMinAggregator.java | 27 ++--- .../aggregator/SketchUnionAggregator.java | 19 +-- .../SketchBufferAggregator.java | 113 ++++++++---------- .../SketchIntersectionBufferAggregator.java | 70 +++++------ .../SketchMaxBufferAggregator.java | 38 ++---- .../SketchMinBufferAggregator.java | 38 ++---- .../SketchUnionBufferAggregator.java | 95 +++++++-------- 16 files changed, 301 insertions(+), 428 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchAggregatorFactory.java index 452dbd6aa938..c0e268b54a66 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchAggregatorFactory.java @@ -118,7 +118,7 @@ public byte[] getCacheKey() { @Override public String toString() { return getClass().getSimpleName() + "{" - + "fieldName='" + fieldName + '\'' + + "fieldName='" + fieldName + '\'' + ", fieldNames='" + fieldNames + '\'' + ", name='" + name + '\'' + ", size=" + size @@ -146,7 +146,7 @@ public boolean equals(Object o) { return false; } if(!fieldName.equals(that.fieldName)){ - return false; + return false; } return fieldNames.equals(that.fieldNames); diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchBuildComplexMetricSerde.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchBuildComplexMetricSerde.java index 6a3fcc71740d..6118554f3dc8 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchBuildComplexMetricSerde.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchBuildComplexMetricSerde.java @@ -50,4 +50,4 @@ public Object extractValue(InputRow inputRow, String metricName) { }; } -} \ No newline at end of file +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchJsonSerializer.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchJsonSerializer.java index 7e31c2bacbbe..dbf574b53f00 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchJsonSerializer.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchJsonSerializer.java @@ -22,7 +22,6 @@ import java.io.IOException; import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.SerializerProvider; import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java index ce04e28ff061..d1c5de2f4d10 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java @@ -26,13 +26,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.metamx.common.IAE; -import com.yahoo.sketches.ResizeFactor; import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; import com.yahoo.sketches.tuple.ArrayOfDoublesSketchIterator; import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; -import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; @@ -88,12 +86,12 @@ public SketchMergeAggregatorFactory( @SuppressWarnings({"rawtypes" }) @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { - ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); - List selectors = new ArrayList<>(); - for(String fieldName : fieldNames){ - FloatColumnSelector s = metricFactory.makeFloatColumnSelector(fieldName); - selectors.add(s); - } + ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); + List selectors = new ArrayList<>(); + for(String fieldName : fieldNames){ + FloatColumnSelector s = metricFactory.makeFloatColumnSelector(fieldName); + selectors.add(s); + } if (selector==null || selectors.isEmpty()) { return new EmptySketchAggregator(name); @@ -101,37 +99,37 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) { switch(this.valuesFunction){ case FUNC_SUM: - return new SketchUnionAggregator(name, selector,selectors, size,valuesCount); + return new SketchUnionAggregator(name, selector,selectors, size,valuesCount); case FUNC_MAX: - return new SketchMaxAggregator(name, selector, selectors, size, valuesCount); + return new SketchMaxAggregator(name, selector, selectors, size, valuesCount); case FUNC_MIN: - return new SketchMinAggregator(name, selector, selectors, size, valuesCount); + return new SketchMinAggregator(name, selector, selectors, size, valuesCount); default: - throw new IAE("not support this function "+this.valuesFunction); + throw new IAE("not support this function "+this.valuesFunction); } } @SuppressWarnings("rawtypes") @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { - ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); - List selectors = new ArrayList<>(); - for(String fieldName : fieldNames){ - FloatColumnSelector s = metricFactory.makeFloatColumnSelector(fieldName); - selectors.add(s); - } + ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); + List selectors = new ArrayList<>(); + for(String fieldName : fieldNames){ + FloatColumnSelector s = metricFactory.makeFloatColumnSelector(fieldName); + selectors.add(s); + } if (selector==null || selectors.isEmpty()) { return new EmptySketchBufferAggregator(); } switch(this.valuesFunction){ case FUNC_SUM: - return new SketchUnionBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); + return new SketchUnionBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); case FUNC_MAX: - return new SketchMaxBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); + return new SketchMaxBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); case FUNC_MIN: - return new SketchMinBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); + return new SketchMinBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); default: - throw new IAE("not support this function "+this.valuesFunction); + throw new IAE("not support this function "+this.valuesFunction); } } @@ -198,32 +196,32 @@ public int getValuesCount(){ @JsonProperty public String getValuesFunction(){ - return valuesFunction; + return valuesFunction; } @Override public Object combine(Object lhs, Object rhs) { switch(this.valuesFunction){ case FUNC_SUM:{ - ArrayOfDoublesUnion union = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildUnion(); + ArrayOfDoublesUnion union = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildUnion(); SketchAggregator.updateUnion(union, lhs); SketchAggregator.updateUnion(union, rhs); return union.getResult(); } case FUNC_MAX:{ - ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildIntersection(); + ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildIntersection(); SketchAggregator.updateIntersection(intersection, lhs,SketchAggregator.max); SketchAggregator.updateIntersection(intersection, rhs,SketchAggregator.max); return intersection.getResult(); } case FUNC_MIN:{ - ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildIntersection(); + ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildIntersection(); SketchAggregator.updateIntersection(intersection, lhs,SketchAggregator.min); SketchAggregator.updateIntersection(intersection, rhs,SketchAggregator.min); return intersection.getResult(); } default: - throw new IAE("not support this function "+this.valuesFunction); + throw new IAE("not support this function "+this.valuesFunction); } } @@ -239,26 +237,26 @@ public Object combine(Object lhs, Object rhs) { public Object finalizeComputation(Object object) { if (shouldFinalize) { if(object instanceof ArrayOfDoublesSketch){ - ArrayOfDoublesSketch tuple=(ArrayOfDoublesSketch)object; + ArrayOfDoublesSketch tuple=(ArrayOfDoublesSketch)object; double theta=tuple.getTheta(); ArrayOfDoublesSketchIterator it = tuple.iterator(); StringBuilder sb = new StringBuilder(); sb.append("{"); while(it.next()){ - sb.append("["); - for(double v : it.getValues()){ - long est = Math.round(v/theta); - sb.append(est+","); - } - if(sb.charAt(sb.length()-1) == ','){ - sb.deleteCharAt(sb.length()-1); - } - sb.append("],"); + sb.append("["); + for(double v : it.getValues()){ + long est = Math.round(v/theta); + sb.append(est+","); + } + if(sb.charAt(sb.length()-1) == ','){ + sb.deleteCharAt(sb.length()-1); + } + sb.append("],"); + } + if(sb.charAt(sb.length()-1) == ','){ + sb.deleteCharAt(sb.length()-1); } - if(sb.charAt(sb.length()-1) == ','){ - sb.deleteCharAt(sb.length()-1); - } sb.append("}"); return String.format("{\"total\":%s,\"detail\":%s}",tuple.getEstimate(),sb.toString()); } @@ -298,7 +296,7 @@ public boolean equals(Object o) { return false; } if (!valuesFunction.equals(that.valuesFunction)){ - return false; + return false; } return isInputTupleSketch == that.isInputTupleSketch; @@ -317,7 +315,7 @@ public int hashCode() { @Override public String toString() { return "SketchMergeAggregatorFactory{" - + "fieldName=" + fieldName + + "fieldName=" + fieldName + ", fieldNames=" + fieldNames + ", name=" + name + ", size=" + size diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java index 7c23b0eb5af7..3a0ab9f09c89 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java @@ -1,7 +1,7 @@ /* * Licensed to Metamarkets Group Inc. (Metamarkets) under one * or more contributor license agreements. See the NOTICE file - * DOUBLEd with this work for additional information + * distributed with this work for additional information * regarding copyright ownership. Metamarkets licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance @@ -68,7 +68,7 @@ public List getJacksonModules() new NamedType(SketchMergeAggregatorFactory.class, TUPLE_DOUBLE_SKETCH) ) .addSerializer( - ArrayOfDoublesSketch.class , new SketchJsonSerializer() + ArrayOfDoublesSketch.class , new SketchJsonSerializer() ) ); } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchOperations.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchOperations.java index e3199bcdc6e8..bbefc5790645 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchOperations.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchOperations.java @@ -38,47 +38,47 @@ */ public class SketchOperations { - public static final ArrayOfDoublesSketch EMPTY_SKETCH = new ArrayOfDoublesUpdatableSketchBuilder() - .setNominalEntries(16384).setResizeFactor(ResizeFactor.X2).setNumberOfValues(1).build(); + public static final ArrayOfDoublesSketch EMPTY_SKETCH = new ArrayOfDoublesUpdatableSketchBuilder() + .setNominalEntries(16384).setResizeFactor(ResizeFactor.X2).setNumberOfValues(1).build(); - public static ArrayOfDoublesUpdatableSketch createSketch(int size, int valuesCount, Object key, Object values) { - ArrayOfDoublesUpdatableSketch sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(size) - .setResizeFactor(ResizeFactor.X2).setNumberOfValues(valuesCount).build(); - String KeyStr = (String) key; - double[] valuesAry = (double[]) values; - double[] finalValuesAry = new double[valuesCount]; - System.arraycopy(valuesAry, 0, finalValuesAry, 0, valuesAry.length); - sketch.update(KeyStr, finalValuesAry); - return sketch; - } + public static ArrayOfDoublesUpdatableSketch createSketch(int size, int valuesCount, Object key, Object values) { + ArrayOfDoublesUpdatableSketch sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(size) + .setResizeFactor(ResizeFactor.X2).setNumberOfValues(valuesCount).build(); + String KeyStr = (String) key; + double[] valuesAry = (double[]) values; + double[] finalValuesAry = new double[valuesCount]; + System.arraycopy(valuesAry, 0, finalValuesAry, 0, valuesAry.length); + sketch.update(KeyStr, finalValuesAry); + return sketch; + } - public static ArrayOfDoublesSketch deserialize(Object serializedSketch) { - if (serializedSketch instanceof String) { - return deserializeFromBase64EncodedString((String) serializedSketch); - } else if (serializedSketch instanceof byte[]) { - return deserializeFromByteArray((byte[]) serializedSketch); - } else if (serializedSketch instanceof ArrayOfDoublesSketch) { - return (ArrayOfDoublesSketch) serializedSketch; - } + public static ArrayOfDoublesSketch deserialize(Object serializedSketch) { + if (serializedSketch instanceof String) { + return deserializeFromBase64EncodedString((String) serializedSketch); + } else if (serializedSketch instanceof byte[]) { + return deserializeFromByteArray((byte[]) serializedSketch); + } else if (serializedSketch instanceof ArrayOfDoublesSketch) { + return (ArrayOfDoublesSketch) serializedSketch; + } - throw new IllegalStateException( - "Object is not of a type that can deserialize to sketch: " + serializedSketch.getClass()); - } + throw new IllegalStateException( + "Object is not of a type that can deserialize to sketch: " + serializedSketch.getClass()); + } - public static ArrayOfDoublesSketch deserializeFromBase64EncodedString(String str) { - return deserializeFromByteArray(Base64.decodeBase64(str.getBytes(Charsets.UTF_8))); - } + public static ArrayOfDoublesSketch deserializeFromBase64EncodedString(String str) { + return deserializeFromByteArray(Base64.decodeBase64(str.getBytes(Charsets.UTF_8))); + } - public static ArrayOfDoublesSketch deserializeFromByteArray(byte[] data) { - return deserializeFromMemory(new NativeMemory(data)); - } + public static ArrayOfDoublesSketch deserializeFromByteArray(byte[] data) { + return deserializeFromMemory(new NativeMemory(data)); + } - public static ArrayOfDoublesSketch deserializeFromMemory(Memory mem) { - if (Sketch.getSerializationVersion(mem) < 3) { - return ArrayOfDoublesSketches.heapifySketch(mem); - } else { - return ArrayOfDoublesSketches.wrapSketch(mem); - } - } + public static ArrayOfDoublesSketch deserializeFromMemory(Memory mem) { + if (Sketch.getSerializationVersion(mem) < 3) { + return ArrayOfDoublesSketches.heapifySketch(mem); + } else { + return ArrayOfDoublesSketches.wrapSketch(mem); + } + } } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchAggregator.java index 44511fc65dda..5edf135b79d4 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchAggregator.java @@ -42,7 +42,7 @@ @SuppressWarnings({"rawtypes"}) public abstract class SketchAggregator implements Aggregator { - protected static final Logger logger = new Logger(SketchAggregator.class); + protected static final Logger logger = new Logger(SketchAggregator.class); protected final List selectors; protected final ObjectColumnSelector selector; @@ -61,16 +61,16 @@ public SketchAggregator(String name,ObjectColumnSelector selector,List selectors, - int size, int valuesCount) { - super(name, selector, selectors, size, valuesCount); - } + public SketchMaxAggregator(String name, ObjectColumnSelector selector, List selectors, + int size, int valuesCount) { + super(name, selector, selectors, size, valuesCount); + } - @Override - public ArrayOfDoublesCombiner getCombiner() { - return SketchAggregator.max; - } + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.max; + } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMinAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMinAggregator.java index 11ceebe04e7c..c9523a392bb8 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMinAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMinAggregator.java @@ -21,18 +21,7 @@ import java.util.List; -import com.metamx.common.ISE; -import com.metamx.common.logger.Logger; -import com.yahoo.sketches.ResizeFactor; -import com.yahoo.memory.Memory; import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; -import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; -import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; -import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; -import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; -import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; - -import io.druid.query.aggregation.Aggregator; import io.druid.segment.FloatColumnSelector; import io.druid.segment.ObjectColumnSelector; @@ -44,15 +33,15 @@ @SuppressWarnings({"rawtypes","unused"}) public class SketchMinAggregator extends SketchIntersectionAggregator { - public SketchMinAggregator(String name, ObjectColumnSelector selector, List selectors, - int size, int valuesCount) { - super(name, selector, selectors, size, valuesCount); - } + public SketchMinAggregator(String name, ObjectColumnSelector selector, List selectors, + int size, int valuesCount) { + super(name, selector, selectors, size, valuesCount); + } - @Override - public ArrayOfDoublesCombiner getCombiner() { - return SketchAggregator.min; - } + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.min; + } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchUnionAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchUnionAggregator.java index 092a005040f4..d7ffde1b7c7c 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchUnionAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchUnionAggregator.java @@ -21,18 +21,9 @@ import java.util.List; -import com.metamx.common.ISE; -import com.metamx.common.logger.Logger; -import com.yahoo.sketches.ResizeFactor; -import com.yahoo.memory.Memory; -import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; -import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; -import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; -import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; -import io.druid.query.aggregation.Aggregator; import io.druid.segment.FloatColumnSelector; import io.druid.segment.ObjectColumnSelector; @@ -51,10 +42,10 @@ public SketchUnionAggregator(String name,ObjectColumnSelector selector,List selectors; - protected final int size; - protected final int valuesCount; - protected final int maxIntermediateSize; - - protected NativeMemory nm; - - public SketchBufferAggregator(ObjectColumnSelector selector, List selectors, int size, - int valuesCount,int maxIntermediateSize) { - this.selector = selector; - this.selectors = selectors; - this.size = size; - this.valuesCount = valuesCount; - this.maxIntermediateSize = maxIntermediateSize; - } - - @Override - public void init(ByteBuffer buf, int position) { - if (nm == null) { - nm = new NativeMemory(buf); - } - Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); - register(position, mem); - } - - public abstract void register(int position,Memory mem); - - @Override - public void aggregate(ByteBuffer buf, int position) { - Object key = selector.get(); - double[] values = new double[selectors.size()]; - for(int i=0;i selectors; + protected final int size; + protected final int valuesCount; + protected final int maxIntermediateSize; + + protected NativeMemory nm; + + public SketchBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount,int maxIntermediateSize) { + this.selector = selector; + this.selectors = selectors; + this.size = size; + this.valuesCount = valuesCount; + this.maxIntermediateSize = maxIntermediateSize; + } + + @Override + public void init(ByteBuffer buf, int position) { + if (nm == null) { + nm = new NativeMemory(buf); + } + Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); + register(position, mem); + } + + public abstract void register(int position,Memory mem); + + @Override + public void aggregate(ByteBuffer buf, int position) { + Object key = selector.get(); + double[] values = new double[selectors.size()]; + for(int i=0;i selectors, int size, - int valuesCount,int maxIntermediateSize) { - super(selector, selectors, size, valuesCount, maxIntermediateSize); - } + public SketchIntersectionBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount,int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } + + + @Override + public void update(ByteBuffer buf,int position,Object key) { + ArrayOfDoublesSketch sketch = SketchAggregator.parseSketch(key); + + ArrayOfDoublesUnion union = unions.get(position); + ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildIntersection(); + intersection.update(sketch, getCombiner()); + intersection.update(union.getResult(), getCombiner()); - @Override - public void update(ByteBuffer buf,int position,Object key) { - ArrayOfDoublesSketch sketch = SketchAggregator.parseSketch(key); - - ArrayOfDoublesUnion union = unions.get(position); - - ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildIntersection(); - intersection.update(sketch, getCombiner()); - intersection.update(union.getResult(), getCombiner()); - - ArrayOfDoublesAnotB anotb = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); - anotb.update(union.getResult(), sketch); - - ArrayOfDoublesAnotB bnota = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); - bnota.update(sketch,union.getResult()); - - union.reset(); - union.update(intersection.getResult()); - union.update(anotb.getResult()); - union.update(bnota.getResult()); - - } + ArrayOfDoublesAnotB anotb = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); + anotb.update(union.getResult(), sketch); + + ArrayOfDoublesAnotB bnota = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); + bnota.update(sketch,union.getResult()); + + union.reset(); + union.update(intersection.getResult()); + union.update(anotb.getResult()); + union.update(bnota.getResult()); + + } public abstract ArrayOfDoublesCombiner getCombiner(); - + } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMaxBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMaxBufferAggregator.java index 5ad403fd5d2c..b884f0cb13ac 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMaxBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMaxBufferAggregator.java @@ -19,29 +19,9 @@ package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; -import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import com.metamx.common.IAE; -import com.metamx.common.ISE; -import com.metamx.common.logger.Logger; -import com.yahoo.sketches.ResizeFactor; -import com.yahoo.memory.Memory; -import com.yahoo.memory.MemoryRegion; -import com.yahoo.memory.NativeMemory; -import com.yahoo.sketches.theta.SetOperation; -import com.yahoo.sketches.theta.Union; import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; -import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; -import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; -import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; -import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; -import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; -import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; - -import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; import io.druid.segment.FloatColumnSelector; import io.druid.segment.ObjectColumnSelector; @@ -54,14 +34,14 @@ @SuppressWarnings({ "rawtypes", "unused" }) public class SketchMaxBufferAggregator extends SketchIntersectionBufferAggregator{ - public SketchMaxBufferAggregator(ObjectColumnSelector selector, List selectors, int size, - int valuesCount, int maxIntermediateSize) { - super(selector, selectors, size, valuesCount, maxIntermediateSize); - } + public SketchMaxBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount, int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } + + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.max; + } - @Override - public ArrayOfDoublesCombiner getCombiner() { - return SketchAggregator.max; - } - } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMinBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMinBufferAggregator.java index 3c31b0f04efd..0c6d449ba8b3 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMinBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMinBufferAggregator.java @@ -19,29 +19,9 @@ package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; -import java.nio.ByteBuffer; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import com.metamx.common.IAE; -import com.metamx.common.ISE; -import com.metamx.common.logger.Logger; -import com.yahoo.sketches.ResizeFactor; -import com.yahoo.memory.Memory; -import com.yahoo.memory.MemoryRegion; -import com.yahoo.memory.NativeMemory; -import com.yahoo.sketches.theta.SetOperation; -import com.yahoo.sketches.theta.Union; import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; -import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; -import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; -import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; -import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; -import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; -import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; - -import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; import io.druid.segment.FloatColumnSelector; import io.druid.segment.ObjectColumnSelector; @@ -54,14 +34,14 @@ @SuppressWarnings({ "rawtypes", "unused" }) public class SketchMinBufferAggregator extends SketchIntersectionBufferAggregator{ - public SketchMinBufferAggregator(ObjectColumnSelector selector, List selectors, int size, - int valuesCount, int maxIntermediateSize) { - super(selector, selectors, size, valuesCount, maxIntermediateSize); - } + public SketchMinBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount, int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } - @Override - public ArrayOfDoublesCombiner getCombiner() { - return SketchAggregator.min; - } - + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.min; + } + } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchUnionBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchUnionBufferAggregator.java index 1f71a5e4c842..f084fdf72c43 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchUnionBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchUnionBufferAggregator.java @@ -24,22 +24,11 @@ import java.util.List; import java.util.Map; -import com.metamx.common.IAE; -import com.metamx.common.ISE; -import com.metamx.common.logger.Logger; -import com.yahoo.sketches.ResizeFactor; import com.yahoo.memory.Memory; import com.yahoo.memory.MemoryRegion; -import com.yahoo.memory.NativeMemory; -import com.yahoo.sketches.theta.SetOperation; -import com.yahoo.sketches.theta.Union; import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; -import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; -import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; -import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; -import io.druid.query.aggregation.BufferAggregator; import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; import io.druid.segment.FloatColumnSelector; import io.druid.segment.ObjectColumnSelector; @@ -52,46 +41,46 @@ @SuppressWarnings({ "rawtypes", "unused" }) public class SketchUnionBufferAggregator extends SketchBufferAggregator { - protected final Map unions = new HashMap<>(); - - public SketchUnionBufferAggregator(ObjectColumnSelector selector, List selectors, int size, - int valuesCount,int maxIntermediateSize) { - super(selector, selectors, size, valuesCount, maxIntermediateSize); - } - - @Override - public void register(int position,Memory mem) { - unions.put(position, new ArrayOfDoublesSetOperationBuilder().setMemory(mem).setNominalEntries(size).setNumberOfValues(valuesCount).buildUnion()); - - } - - @Override - public void update(ByteBuffer buf,int position,Object key) { - SketchAggregator.updateUnion(getUnion(buf,position),key); - } - - - - @Override - public Object get(ByteBuffer buf, int position) { - return getUnion(buf, position).getResult(); - } - - - @Override - public void close() { - unions.clear(); - } - - // Note that this is not threadsafe and I don't think it needs to be - private ArrayOfDoublesUnion getUnion(ByteBuffer buf, int position) { - ArrayOfDoublesUnion union = unions.get(position); - if (union == null) { - Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); - union = new ArrayOfDoublesSetOperationBuilder().setMemory(mem).setNominalEntries(this.size).setNumberOfValues(this.valuesCount).buildUnion(); - unions.put(position, union); - } - return union; - } - + protected final Map unions = new HashMap<>(); + + public SketchUnionBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount,int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } + + @Override + public void register(int position,Memory mem) { + unions.put(position, new ArrayOfDoublesSetOperationBuilder().setMemory(mem).setNominalEntries(size).setNumberOfValues(valuesCount).buildUnion()); + + } + + @Override + public void update(ByteBuffer buf,int position,Object key) { + SketchAggregator.updateUnion(getUnion(buf,position),key); + } + + + + @Override + public Object get(ByteBuffer buf, int position) { + return getUnion(buf, position).getResult(); + } + + + @Override + public void close() { + unions.clear(); + } + + // Note that this is not threadsafe and I don't think it needs to be + private ArrayOfDoublesUnion getUnion(ByteBuffer buf, int position) { + ArrayOfDoublesUnion union = unions.get(position); + if (union == null) { + Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); + union = new ArrayOfDoublesSetOperationBuilder().setMemory(mem).setNominalEntries(this.size).setNumberOfValues(this.valuesCount).buildUnion(); + unions.put(position, union); + } + return union; + } + } From d14bf422754ce99651c460d37f83d1eecdfa2e5b Mon Sep 17 00:00:00 2001 From: tallong Date: Mon, 20 Mar 2017 14:19:39 +0800 Subject: [PATCH 3/5] remove import * --- .../datasketches/tuple/doublearray/SketchModule.java | 2 +- .../aggregator/SketchIntersectionAggregator.java | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java index 3a0ab9f09c89..e78f66e4d55d 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java @@ -10,7 +10,7 @@ * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, - * software DOUBLEd under the License is DOUBLEd on an + * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchIntersectionAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchIntersectionAggregator.java index f6c8fd845749..691718060940 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchIntersectionAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchIntersectionAggregator.java @@ -19,7 +19,11 @@ package io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator; -import com.yahoo.sketches.tuple.*; +import com.yahoo.sketches.tuple.ArrayOfDoublesAnotB; +import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; import io.druid.segment.FloatColumnSelector; import io.druid.segment.ObjectColumnSelector; From 917a928d92d71e56f9be175538f6126cbac58dac Mon Sep 17 00:00:00 2001 From: tallong Date: Mon, 20 Mar 2017 15:28:01 +0800 Subject: [PATCH 4/5] modify finalizeComputation --- .../tuple/doublearray/SketchMergeAggregatorFactory.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java index d1c5de2f4d10..802cb005d9e8 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java @@ -238,16 +238,14 @@ public Object finalizeComputation(Object object) { if (shouldFinalize) { if(object instanceof ArrayOfDoublesSketch){ ArrayOfDoublesSketch tuple=(ArrayOfDoublesSketch)object; - - double theta=tuple.getTheta(); + ArrayOfDoublesSketchIterator it = tuple.iterator(); StringBuilder sb = new StringBuilder(); sb.append("{"); while(it.next()){ sb.append("["); for(double v : it.getValues()){ - long est = Math.round(v/theta); - sb.append(est+","); + sb.append(v+","); } if(sb.charAt(sb.length()-1) == ','){ sb.deleteCharAt(sb.length()-1); @@ -258,7 +256,7 @@ public Object finalizeComputation(Object object) { sb.deleteCharAt(sb.length()-1); } sb.append("}"); - return String.format("{\"total\":%s,\"detail\":%s}",tuple.getEstimate(),sb.toString()); + return String.format("{\"total\":%s,\"theta\":%s,\"detail\":%s}",tuple.getEstimate(),tuple.getTheta(),sb.toString()); } throw new IAE("not support this class "+object.getClass()); } else { From c43562f766b6fa8518129805bcb14756f511c212 Mon Sep 17 00:00:00 2001 From: tallong Date: Mon, 20 Mar 2017 18:04:26 +0800 Subject: [PATCH 5/5] rebuild --- .../tuple/doublearray/SketchMergeAggregatorFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java index 802cb005d9e8..12c2aae37fd3 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java @@ -227,7 +227,7 @@ public Object combine(Object lhs, Object rhs) { } /** - * Finalize the computation on sketch object and returns estimate from underlying + * Finalize the computation on sketch object and returns estimate and details from underlying * sketch. * * @param object the sketch object