diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml index edf0905c77a0..192b7b597f07 100644 --- a/extensions-core/datasketches/pom.xml +++ b/extensions-core/datasketches/pom.xml @@ -37,7 +37,7 @@ com.yahoo.datasketches sketches-core - 0.2.2 + 0.5.1 io.druid diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchAggregatorFactory.java new file mode 100644 index 000000000000..452dbd6aa938 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchAggregatorFactory.java @@ -0,0 +1,164 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Ints; +import com.yahoo.sketches.Util; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; + +import io.druid.query.aggregation.AggregatorFactory; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public abstract class SketchAggregatorFactory extends AggregatorFactory { + public static final int DEFAULT_MAX_SKETCH_SIZE = 16384; + + protected final String name; + protected final String fieldName; + protected final List fieldNames; + protected final int size; + private final byte cacheId; + + public static final Comparator COMPARATOR = new Comparator() { + @Override + public int compare(ArrayOfDoublesSketch o, ArrayOfDoublesSketch o1) { + return Doubles.compare(o.getEstimate(), o1.getEstimate()); + } + }; + + public SketchAggregatorFactory(String name, String fieldName,List fieldNames, Integer size, byte cacheId) { + this.name = Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); + this.fieldName = Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); + this.fieldNames = Preconditions.checkNotNull(fieldNames, "Must have a valid, non-null fieldNames"); + + this.size = size == null ? DEFAULT_MAX_SKETCH_SIZE : size; + Util.checkIfPowerOf2(this.size, "size"); + + this.cacheId = cacheId; + } + + + + @Override + public Object deserialize(Object object) { + return SketchOperations.deserialize(object); + } + + @Override + public Comparator getComparator() { + return COMPARATOR; + } + + + @Override + @JsonProperty + public String getName() { + return name; + } + + @JsonProperty + public String getFieldName(){ + return fieldName; + } + + @JsonProperty + public List getFieldNames() { + return fieldNames; + } + + @JsonProperty + public int getSize() { + return size; + } + + + @Override + public List requiredFields() { + return Collections.singletonList(fieldName); + } + + @Override + public byte[] getCacheKey() { + byte[] fieldNameBytes = fieldName.getBytes(); + return ByteBuffer.allocate(1 + Ints.BYTES + fieldNameBytes.length) + .put(cacheId) + .putInt(size) + .put(fieldNameBytes) + .array(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{" + + "fieldName='" + fieldName + '\'' + + ", fieldNames='" + fieldNames + '\'' + + ", name='" + name + '\'' + + ", size=" + size + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SketchAggregatorFactory that = (SketchAggregatorFactory) o; + + if (size != that.size) { + return false; + } + if (cacheId != that.cacheId) { + return false; + } + if (!name.equals(that.name)) { + return false; + } + if(!fieldName.equals(that.fieldName)){ + return false; + } + return fieldNames.equals(that.fieldNames); + + } + + @Override + public int hashCode() { + int result = name.hashCode(); + result = 31 * result + fieldName.hashCode(); + result = 31 * result + fieldNames.hashCode(); + result = 31 * result + size; + result = 31 * result + (int) cacheId; + return result; + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchBuildComplexMetricSerde.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchBuildComplexMetricSerde.java new file mode 100644 index 000000000000..6a3fcc71740d --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchBuildComplexMetricSerde.java @@ -0,0 +1,53 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import io.druid.data.input.InputRow; +import io.druid.segment.serde.ComplexMetricExtractor; + + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class SketchBuildComplexMetricSerde extends SketchMergeComplexMetricSerde +{ + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + + @Override + public Class extractedClass() + { + return Object.class; + } + + @Override + public Object extractValue(InputRow inputRow, String metricName) { + + return inputRow.getRaw(metricName); + } + }; + } + +} \ No newline at end of file diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchJsonSerializer.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchJsonSerializer.java new file mode 100644 index 000000000000..b570aa245b3e --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchJsonSerializer.java @@ -0,0 +1,44 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.io.IOException; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class SketchJsonSerializer extends JsonSerializer +{ + @Override + public void serialize(ArrayOfDoublesSketch sketch, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException + { + jgen.writeBinary(sketch.toByteArray()); + } +} + diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java new file mode 100644 index 000000000000..6c91551922dd --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeAggregatorFactory.java @@ -0,0 +1,335 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.metamx.common.IAE; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketchIterator; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.EmptySketchAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchMaxAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchMinAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchUnionAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator.EmptySketchBufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator.SketchMaxBufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator.SketchMinBufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator.SketchUnionBufferAggregator; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class SketchMergeAggregatorFactory extends SketchAggregatorFactory { + private static final byte CACHE_TYPE_ID = 15; + + private final boolean shouldFinalize; + private final boolean isInputTupleSketch; + private final int valuesCount; + private final String valuesFunction; + + private final String FUNC_SUM = "sum"; + private final String FUNC_MAX = "max"; + private final String FUNC_MIN = "min"; + + @JsonCreator + public SketchMergeAggregatorFactory( + @JsonProperty("name") String name, + @JsonProperty("fieldName") String fieldName, + @JsonProperty("fieldNames") List fieldNames, + @JsonProperty("size") Integer size, + @JsonProperty("shouldFinalize") Boolean shouldFinalize, + @JsonProperty("isInputTupleSketch") Boolean isInputTupleSketch, + @JsonProperty("valuesCount") int valuesCount, + @JsonProperty("valuesFunction") String valuesFunction){ + super(name,fieldName, fieldNames, size, CACHE_TYPE_ID); + this.shouldFinalize = (shouldFinalize == null) ? true : shouldFinalize.booleanValue(); + this.isInputTupleSketch = (isInputTupleSketch == null) ? false : isInputTupleSketch.booleanValue(); + this.valuesCount=valuesCount; + this.valuesFunction = valuesFunction; + } + + @SuppressWarnings({"rawtypes" }) + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) { + ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); + List selectors = new ArrayList<>(); + for(String fieldName : fieldNames){ + FloatColumnSelector s = metricFactory.makeFloatColumnSelector(fieldName); + selectors.add(s); + } + + if (selector==null || selectors.isEmpty()) { + return new EmptySketchAggregator(name); + } + + switch(this.valuesFunction){ + case FUNC_SUM: + return new SketchUnionAggregator(name, selector,selectors, size,valuesCount); + case FUNC_MAX: + return new SketchMaxAggregator(name, selector, selectors, size, valuesCount); + case FUNC_MIN: + return new SketchMinAggregator(name, selector, selectors, size, valuesCount); + default: + throw new IAE("not support this function "+this.valuesFunction); + } + } + + @SuppressWarnings("rawtypes") + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { + ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName); + List selectors = new ArrayList<>(); + for(String fieldName : fieldNames){ + FloatColumnSelector s = metricFactory.makeFloatColumnSelector(fieldName); + selectors.add(s); + } + if (selector==null || selectors.isEmpty()) { + return new EmptySketchBufferAggregator(); + } + switch(this.valuesFunction){ + case FUNC_SUM: + return new SketchUnionBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); + case FUNC_MAX: + return new SketchMaxBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); + case FUNC_MIN: + return new SketchMinBufferAggregator(selector,selectors, size, valuesCount,getMaxIntermediateSize()); + default: + throw new IAE("not support this function "+this.valuesFunction); + } + } + + + @Override + public int getMaxIntermediateSize() { + return ArrayOfDoublesUnion.getMaxBytes(size, valuesCount); + } + + @Override + public List getRequiredColumns() { + return Collections.singletonList( + new SketchMergeAggregatorFactory( + name, + fieldName, + fieldNames, + size, + shouldFinalize, + isInputTupleSketch, + valuesCount, + valuesFunction + ) + ); + } + + @Override + public Object getAggregatorStartValue() { + return new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(this.size). + setResizeFactor(ResizeFactor.X2).setNumberOfValues(this.valuesCount).build(); + } + + @Override + public AggregatorFactory getCombiningFactory() { + return new SketchMergeAggregatorFactory(name, fieldName,fieldNames, size, shouldFinalize, false,valuesCount,valuesFunction); + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException { + if (other.getName().equals(this.getName()) && other instanceof SketchMergeAggregatorFactory) { + SketchMergeAggregatorFactory castedOther = (SketchMergeAggregatorFactory) other; + + return new SketchMergeAggregatorFactory( + name, + fieldName, + fieldNames, + Math.max(size, castedOther.size), + shouldFinalize, + true,valuesCount,valuesFunction + ); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @JsonProperty + public boolean getShouldFinalize() { + return shouldFinalize; + } + + @JsonProperty + public boolean getIsInputTupleSketch() { + return isInputTupleSketch; + } + + @JsonProperty + public int getValuesCount(){ + return valuesCount; + } + + @JsonProperty + public String getValuesFunction(){ + return valuesFunction; + } + + @Override + public Object combine(Object lhs, Object rhs) { + switch(this.valuesFunction){ + case FUNC_SUM:{ + ArrayOfDoublesUnion union = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildUnion(); + SketchAggregator.updateUnion(union, lhs); + SketchAggregator.updateUnion(union, rhs); + return union.getResult(); + } + case FUNC_MAX:{ + ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildIntersection(); + SketchAggregator.updateIntersection(intersection, lhs,SketchAggregator.max); + SketchAggregator.updateIntersection(intersection, rhs,SketchAggregator.max); + return intersection.getResult(); + } + case FUNC_MIN:{ + ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(this.valuesCount).buildIntersection(); + SketchAggregator.updateIntersection(intersection, lhs,SketchAggregator.min); + SketchAggregator.updateIntersection(intersection, rhs,SketchAggregator.min); + return intersection.getResult(); + } + default: + throw new IAE("not support this function "+this.valuesFunction); + } + + } + + /** + * Finalize the computation on sketch object and returns estimate from underlying + * sketch. + * + * @param object the sketch object + * @return sketch object + */ + @Override + public Object finalizeComputation(Object object) { + if (shouldFinalize) { + if(object instanceof ArrayOfDoublesSketch){ + ArrayOfDoublesSketch tuple=(ArrayOfDoublesSketch)object; + + double theta=tuple.getTheta(); + ArrayOfDoublesSketchIterator it = tuple.iterator(); + StringBuilder sb = new StringBuilder(); + sb.append("{"); + while(it.next()){ + sb.append("["); + for(double v : it.getValues()){ + long est = Math.round(v/theta); + sb.append(est+","); + } + if(sb.charAt(sb.length()-1) == ','){ + sb.deleteCharAt(sb.length()-1); + } + sb.append("],"); + } + if(sb.charAt(sb.length()-1) == ','){ + sb.deleteCharAt(sb.length()-1); + } + sb.append("}"); + return String.format("{\"total\":%s,\"detail\":%s}",tuple.getEstimate(),sb.toString()); + } + throw new IAE("not support this class "+object.getClass()); + } else { + return object; + } + } + + @Override + public String getTypeName() { + if (isInputTupleSketch) { + return SketchModule.TUPLE_DOUBLE_SKETCH_MERGE_AGG; + } else { + return SketchModule.TUPLE_DOUBLE_SKETCH_BUILD_AGG; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + + SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory) o; + + if (shouldFinalize != that.shouldFinalize) { + return false; + } + if (valuesCount != that.valuesCount) { + return false; + } + if (!valuesFunction.equals(that.valuesFunction)){ + return false; + } + return isInputTupleSketch == that.isInputTupleSketch; + + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (shouldFinalize ? 1 : 0); + result = 31 * result + (isInputTupleSketch ? 1 : 0); + result = 31 * result + valuesCount; + result = 31 * result + valuesFunction.hashCode(); + return result; + } + + @Override + public String toString() { + return "SketchMergeAggregatorFactory{" + + "fieldName=" + fieldName + + ", fieldNames=" + fieldNames + + ", name=" + name + + ", size=" + size + + ", shouldFinalize=" + shouldFinalize + + ", isInputTupleSketch=" + isInputTupleSketch + + ", valuesCount="+valuesCount + + ", valuesFunction="+valuesFunction + + "}"; + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeComplexMetricSerde.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeComplexMetricSerde.java new file mode 100644 index 000000000000..665009e43ee3 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchMergeComplexMetricSerde.java @@ -0,0 +1,89 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.nio.ByteBuffer; + +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; + +import io.druid.data.input.InputRow; +import io.druid.segment.column.ColumnBuilder; +import io.druid.segment.data.GenericIndexed; +import io.druid.segment.data.ObjectStrategy; +import io.druid.segment.serde.ComplexColumnPartSupplier; +import io.druid.segment.serde.ComplexMetricExtractor; +import io.druid.segment.serde.ComplexMetricSerde; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings("unchecked") +public class SketchMergeComplexMetricSerde extends ComplexMetricSerde +{ + private SketchObjectStrategy strategy = new SketchObjectStrategy(); + + @Override + public String getTypeName() + { + return SketchModule.TUPLE_DOUBLE_SKETCH; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + @Override + public Class extractedClass() + { + return ArrayOfDoublesSketch.class; + } + + @Override + public Object extractValue(InputRow inputRow, String metricName) + { + final Object object = inputRow.getRaw(metricName); + if (object == null || object instanceof ArrayOfDoublesSketch || object instanceof Memory) { + return object; + } + return SketchOperations.deserialize(object); + + } + }; + } + + + @Override + public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder) + { + GenericIndexed ge = GenericIndexed.read(buffer, strategy); + builder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), ge)); + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return strategy; + } + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java new file mode 100644 index 000000000000..7c23b0eb5af7 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchModule.java @@ -0,0 +1,77 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * DOUBLEd with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software DOUBLEd under the License is DOUBLEd on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.util.Arrays; +import java.util.List; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; + +import io.druid.initialization.DruidModule; +import io.druid.segment.serde.ComplexMetrics; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class SketchModule implements DruidModule +{ + public static final String TUPLE_DOUBLE_SKETCH = "tupleDoubleArraySketch"; + public static final String TUPLE_DOUBLE_SKETCH_MERGE_AGG = "tupleDoubleArraySketchMerge"; + public static final String TUPLE_DOUBLE_SKETCH_BUILD_AGG = "tupleDoubleArraySketchBuild"; + + + @Override + public void configure(Binder binder) + { + if (ComplexMetrics.getSerdeForType(TUPLE_DOUBLE_SKETCH) == null) { + ComplexMetrics.registerSerde(TUPLE_DOUBLE_SKETCH, new SketchMergeComplexMetricSerde()); + } + + if (ComplexMetrics.getSerdeForType(TUPLE_DOUBLE_SKETCH_MERGE_AGG) == null) { + ComplexMetrics.registerSerde(TUPLE_DOUBLE_SKETCH_MERGE_AGG, new SketchMergeComplexMetricSerde()); + } + + if (ComplexMetrics.getSerdeForType(TUPLE_DOUBLE_SKETCH_BUILD_AGG) == null) { + ComplexMetrics.registerSerde(TUPLE_DOUBLE_SKETCH_BUILD_AGG, new SketchBuildComplexMetricSerde()); + } + } + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("TupleDoubleArraySketchModule") + .registerSubtypes( + new NamedType(SketchMergeAggregatorFactory.class, TUPLE_DOUBLE_SKETCH) + ) + .addSerializer( + ArrayOfDoublesSketch.class , new SketchJsonSerializer() + ) + ); + } + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchObjectStrategy.java new file mode 100644 index 000000000000..150254082d25 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchObjectStrategy.java @@ -0,0 +1,109 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import java.nio.ByteBuffer; + +import com.google.common.primitives.Longs; +import com.metamx.common.IAE; +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.memory.MemoryRegion; +import com.yahoo.sketches.memory.NativeMemory; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; + +import io.druid.segment.data.ObjectStrategy; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings("rawtypes") +public class SketchObjectStrategy implements ObjectStrategy +{ + + private static final byte[] EMPTY_BYTES = new byte[]{}; + private static final ArrayOfDoublesSketch EMPTY_SKETCH = SketchOperations.EMPTY_SKETCH; + + @Override + public int compare(Object s1, Object s2) + { + if (s1 instanceof ArrayOfDoublesSketch) { + if (s2 instanceof ArrayOfDoublesSketch) { + return SketchAggregatorFactory.COMPARATOR.compare((ArrayOfDoublesSketch) s1, (ArrayOfDoublesSketch) s2); + } else { + return -1; + } + } + if (s1 instanceof Memory) { + if (s2 instanceof Memory) { + Memory s1Mem = (Memory) s1; + Memory s2Mem = (Memory) s2; + + // We have two Ordered Compact sketches, so just compare their last entry if they have the size. + // This is to produce a deterministic ordering, though it might not match the actual estimate + // ordering, but that's ok because this comparator is only used by GenericIndexed + int retVal = Longs.compare(s1Mem.getCapacity(), s2Mem.getCapacity()); + if (retVal == 0) { + retVal = Longs.compare(s1Mem.getLong(s1Mem.getCapacity() - 8), s2Mem.getLong(s2Mem.getCapacity() - 8)); + } + + return retVal; + } else { + return 1; + } + } + throw new IAE("Unknwon class[%s], toString[%s]", s1.getClass(), s1); + + } + + @Override + public Class getClazz() + { + return ArrayOfDoublesSketch.class; + } + + @Override + public Object fromByteBuffer(ByteBuffer buffer, int numBytes) + { + if (numBytes == 0) { + return EMPTY_SKETCH; + } + + return new MemoryRegion(new NativeMemory(buffer), buffer.position(), numBytes); + } + + @Override + public byte[] toBytes(Object obj) + { + if (obj instanceof ArrayOfDoublesSketch) { + return ((ArrayOfDoublesSketch) obj).toByteArray(); + } else if (obj instanceof Memory) { + Memory mem = (Memory) obj; + byte[] retVal = new byte[(int) mem.getCapacity()]; + mem.getByteArray(0, retVal, 0, (int) mem.getCapacity()); + return retVal; + } else if (obj == null) { + return EMPTY_BYTES; + } else { + throw new IAE("Unknown class[%s], toString[%s]", obj.getClass(), obj); + } + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchOperations.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchOperations.java new file mode 100644 index 000000000000..ac6b1a9a5bae --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/SketchOperations.java @@ -0,0 +1,84 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray; + +import org.apache.commons.codec.binary.Base64; + +import com.google.common.base.Charsets; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.memory.NativeMemory; +import com.yahoo.sketches.theta.Sketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketches; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class SketchOperations { + + public static final ArrayOfDoublesSketch EMPTY_SKETCH = new ArrayOfDoublesUpdatableSketchBuilder() + .setNominalEntries(16384).setResizeFactor(ResizeFactor.X2).setNumberOfValues(1).build(); + + public static ArrayOfDoublesUpdatableSketch createSketch(int size, int valuesCount, Object key, Object values) { + ArrayOfDoublesUpdatableSketch sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(size) + .setResizeFactor(ResizeFactor.X2).setNumberOfValues(valuesCount).build(); + String KeyStr = (String) key; + double[] valuesAry = (double[]) values; + double[] finalValuesAry = new double[valuesCount]; + System.arraycopy(valuesAry, 0, finalValuesAry, 0, valuesAry.length); + sketch.update(KeyStr, finalValuesAry); + return sketch; + } + + public static ArrayOfDoublesSketch deserialize(Object serializedSketch) { + if (serializedSketch instanceof String) { + return deserializeFromBase64EncodedString((String) serializedSketch); + } else if (serializedSketch instanceof byte[]) { + return deserializeFromByteArray((byte[]) serializedSketch); + } else if (serializedSketch instanceof ArrayOfDoublesSketch) { + return (ArrayOfDoublesSketch) serializedSketch; + } + + throw new IllegalStateException( + "Object is not of a type that can deserialize to sketch: " + serializedSketch.getClass()); + } + + public static ArrayOfDoublesSketch deserializeFromBase64EncodedString(String str) { + return deserializeFromByteArray(Base64.decodeBase64(str.getBytes(Charsets.UTF_8))); + } + + public static ArrayOfDoublesSketch deserializeFromByteArray(byte[] data) { + return deserializeFromMemory(new NativeMemory(data)); + } + + public static ArrayOfDoublesSketch deserializeFromMemory(Memory mem) { + if (Sketch.getSerializationVersion(mem) < 3) { + return ArrayOfDoublesSketches.heapifySketch(mem); + } else { + return ArrayOfDoublesSketches.wrapSketch(mem); + } + } + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/EmptySketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/EmptySketchAggregator.java new file mode 100644 index 000000000000..f619c0d79479 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/EmptySketchAggregator.java @@ -0,0 +1,77 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator; + +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.SketchOperations; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class EmptySketchAggregator implements Aggregator +{ + private final String name; + + public EmptySketchAggregator(String name) + { + this.name = name; + } + + @Override + public void aggregate() + { + } + + @Override + public void reset() + { + } + + @Override + public Object get() + { + return SketchOperations.EMPTY_SKETCH; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public String getName() + { + return name; + } + + @Override + public void close() + { + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchAggregator.java new file mode 100644 index 000000000000..be56ecfd10ed --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchAggregator.java @@ -0,0 +1,157 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator; + +import java.util.List; + +import com.metamx.common.IAE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; +import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; + +import io.druid.query.aggregation.Aggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.SketchOperations; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({"rawtypes"}) +public abstract class SketchAggregator implements Aggregator { + + protected static final Logger logger = new Logger(SketchAggregator.class); + + protected final List selectors; + protected final ObjectColumnSelector selector; + protected final String name; + protected final int size; + protected final int valuesCount; + + + public SketchAggregator(String name,ObjectColumnSelector selector,List selectors,int size,int valuesCount) { + this.name = name; + this.selector = selector; + this.selectors = selectors; + this.size = size; + this.valuesCount = valuesCount; + } + + @Override + public void aggregate() { + Object key = selector.get(); + double[] values = new double[selectors.size()]; + for(int i=0;i selectors,int size,int valuesCount) { + super(name, selector, selectors, size, valuesCount); + + } + + @Override + public void update(Object key) { + ArrayOfDoublesSketch sketch = parseSketch(key); + + ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildIntersection(); + intersection.update(sketch, getCombiner()); + intersection.update(union.getResult(), getCombiner()); + + ArrayOfDoublesAnotB anotb = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); + anotb.update(union.getResult(), sketch); + + ArrayOfDoublesAnotB bnota = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); + bnota.update(sketch,union.getResult()); + + union.reset(); + union.update(intersection.getResult()); + union.update(anotb.getResult()); + union.update(bnota.getResult()); + + } + + + public abstract ArrayOfDoublesCombiner getCombiner(); + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMaxAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMaxAggregator.java new file mode 100644 index 000000000000..3c24cc24b29c --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMaxAggregator.java @@ -0,0 +1,60 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator; + +import java.util.List; + +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({"rawtypes","unused"}) +public class SketchMaxAggregator extends SketchIntersectionAggregator { + + public SketchMaxAggregator(String name, ObjectColumnSelector selector, List selectors, + int size, int valuesCount) { + super(name, selector, selectors, size, valuesCount); + } + + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.max; + } + + + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMinAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMinAggregator.java new file mode 100644 index 000000000000..060c90abe598 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchMinAggregator.java @@ -0,0 +1,60 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator; + +import java.util.List; + +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({"rawtypes","unused"}) +public class SketchMinAggregator extends SketchIntersectionAggregator { + + public SketchMinAggregator(String name, ObjectColumnSelector selector, List selectors, + int size, int valuesCount) { + super(name, selector, selectors, size, valuesCount); + } + + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.min; + } + + + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchUnionAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchUnionAggregator.java new file mode 100644 index 000000000000..6ce37c9f2486 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/aggregator/SketchUnionAggregator.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator; + +import java.util.List; + +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.Aggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({"rawtypes","unused"}) +public class SketchUnionAggregator extends SketchAggregator { + + protected ArrayOfDoublesUnion union; + + public SketchUnionAggregator(String name,ObjectColumnSelector selector,List selectors,int size,int valuesCount) { + super(name, selector, selectors, size, valuesCount); + this.union = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildUnion(); + } + + @Override + public void update(Object key) { + updateUnion(union, key); + } + + @Override + public void reset() { + union.reset(); + } + + @Override + public Object get() { + return union.getResult(); + } + + @Override + public void close() { + union = null; + } + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/EmptySketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/EmptySketchBufferAggregator.java new file mode 100644 index 000000000000..c28d4479f4fe --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/EmptySketchBufferAggregator.java @@ -0,0 +1,70 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.SketchOperations; + +import java.nio.ByteBuffer; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +public class EmptySketchBufferAggregator implements BufferAggregator +{ + public EmptySketchBufferAggregator() + { + } + + @Override + public void init(ByteBuffer buf, int position) + { + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + } + + @Override + public Object get(ByteBuffer buf, int position) + { + return SketchOperations.EMPTY_SKETCH; + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchBufferAggregator.java new file mode 100644 index 000000000000..beb401a96658 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchBufferAggregator.java @@ -0,0 +1,113 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.memory.MemoryRegion; +import com.yahoo.sketches.memory.NativeMemory; +import com.yahoo.sketches.theta.SetOperation; +import com.yahoo.sketches.theta.Union; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.SketchOperations; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({ "rawtypes", "unused" }) +public abstract class SketchBufferAggregator implements BufferAggregator { + protected static final Logger logger = new Logger(SketchAggregator.class); + + protected final ObjectColumnSelector selector; + protected final List selectors; + protected final int size; + protected final int valuesCount; + protected final int maxIntermediateSize; + + protected NativeMemory nm; + + public SketchBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount,int maxIntermediateSize) { + this.selector = selector; + this.selectors = selectors; + this.size = size; + this.valuesCount = valuesCount; + this.maxIntermediateSize = maxIntermediateSize; + } + + @Override + public void init(ByteBuffer buf, int position) { + if (nm == null) { + nm = new NativeMemory(buf); + } + Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); + register(position, mem); + } + + public abstract void register(int position,Memory mem); + + @Override + public void aggregate(ByteBuffer buf, int position) { + Object key = selector.get(); + double[] values = new double[selectors.size()]; + for(int i=0;i selectors, int size, + int valuesCount,int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } + + + @Override + public void update(ByteBuffer buf,int position,Object key) { + ArrayOfDoublesSketch sketch = SketchAggregator.parseSketch(key); + + ArrayOfDoublesUnion union = unions.get(position); + + ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildIntersection(); + intersection.update(sketch, getCombiner()); + intersection.update(union.getResult(), getCombiner()); + + ArrayOfDoublesAnotB anotb = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); + anotb.update(union.getResult(), sketch); + + ArrayOfDoublesAnotB bnota = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(size).setNumberOfValues(valuesCount).buildAnotB(); + bnota.update(sketch,union.getResult()); + + union.reset(); + union.update(intersection.getResult()); + union.update(anotb.getResult()); + union.update(bnota.getResult()); + + } + + public abstract ArrayOfDoublesCombiner getCombiner(); + + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMaxBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMaxBufferAggregator.java new file mode 100644 index 000000000000..a336f86d4e69 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMaxBufferAggregator.java @@ -0,0 +1,67 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.memory.MemoryRegion; +import com.yahoo.sketches.memory.NativeMemory; +import com.yahoo.sketches.theta.SetOperation; +import com.yahoo.sketches.theta.Union; +import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; +import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({ "rawtypes", "unused" }) +public class SketchMaxBufferAggregator extends SketchIntersectionBufferAggregator{ + + public SketchMaxBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount, int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } + + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.max; + } + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMinBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMinBufferAggregator.java new file mode 100644 index 000000000000..f074a3ced88c --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchMinBufferAggregator.java @@ -0,0 +1,67 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.memory.MemoryRegion; +import com.yahoo.sketches.memory.NativeMemory; +import com.yahoo.sketches.theta.SetOperation; +import com.yahoo.sketches.theta.Union; +import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner; +import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({ "rawtypes", "unused" }) +public class SketchMinBufferAggregator extends SketchIntersectionBufferAggregator{ + + public SketchMinBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount, int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } + + @Override + public ArrayOfDoublesCombiner getCombiner() { + return SketchAggregator.min; + } + +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchUnionBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchUnionBufferAggregator.java new file mode 100644 index 000000000000..69c8006ee23a --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/doublearray/bufferaggregator/SketchUnionBufferAggregator.java @@ -0,0 +1,97 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple.doublearray.bufferaggregator; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.metamx.common.IAE; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.yahoo.sketches.ResizeFactor; +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.memory.MemoryRegion; +import com.yahoo.sketches.memory.NativeMemory; +import com.yahoo.sketches.theta.SetOperation; +import com.yahoo.sketches.theta.Union; +import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder; +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUnion; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.datasketches.tuple.doublearray.aggregator.SketchAggregator; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.ObjectColumnSelector; + +/** + * + * @author sunxin@rongcapital.cn + * + */ +@SuppressWarnings({ "rawtypes", "unused" }) +public class SketchUnionBufferAggregator extends SketchBufferAggregator { + + protected final Map unions = new HashMap<>(); + + public SketchUnionBufferAggregator(ObjectColumnSelector selector, List selectors, int size, + int valuesCount,int maxIntermediateSize) { + super(selector, selectors, size, valuesCount, maxIntermediateSize); + } + + @Override + public void register(int position,Memory mem) { + unions.put(position, new ArrayOfDoublesSetOperationBuilder().setMemory(mem).setNominalEntries(size).setNumberOfValues(valuesCount).buildUnion()); + + } + + @Override + public void update(ByteBuffer buf,int position,Object key) { + SketchAggregator.updateUnion(getUnion(buf,position),key); + } + + + + @Override + public Object get(ByteBuffer buf, int position) { + return getUnion(buf, position).getResult(); + } + + + @Override + public void close() { + unions.clear(); + } + + // Note that this is not threadsafe and I don't think it needs to be + private ArrayOfDoublesUnion getUnion(ByteBuffer buf, int position) { + ArrayOfDoublesUnion union = unions.get(position); + if (union == null) { + Memory mem = new MemoryRegion(nm, position, maxIntermediateSize); + union = new ArrayOfDoublesSetOperationBuilder().setMemory(mem).setNominalEntries(this.size).setNumberOfValues(this.valuesCount).buildUnion(); + unions.put(position, union); + } + return union; + } + +} diff --git a/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule index efd10f3a7e78..174de09f9f6f 100644 --- a/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +++ b/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -1,2 +1,3 @@ io.druid.query.aggregation.datasketches.theta.SketchModule io.druid.query.aggregation.datasketches.theta.oldapi.OldApiSketchModule +io.druid.query.aggregation.datasketches.tuple.doublearray.SketchModule \ No newline at end of file