()
+ {
+ @Override
+ public boolean isNull()
+ {
+ return getObject() == null;
+ }
+
+ private Number getOrZero()
+ {
+ SpectatorHistogram histogram = getObject();
+ return histogram != null ? histogram : ZERO;
+ }
+
+ @Override
+ public long getLong()
+ {
+ return getOrZero().longValue();
+ }
+
+ @Override
+ public float getFloat()
+ {
+ return getOrZero().floatValue();
+ }
+
+ @Override
+ public double getDouble()
+ {
+ return getOrZero().doubleValue();
+ }
+
+ @Nullable
+ @Override
+ public SpectatorHistogram getObject()
+ {
+ return (SpectatorHistogram) getRowValue(offset.getOffset());
+ }
+
+ @Override
+ public Class classOfObject()
+ {
+ return getClazz();
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("column", SpectatorHistogramIndexBasedComplexColumn.this);
+ }
+ };
+ }
+}
diff --git a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramIndexed.java b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramIndexed.java
new file mode 100644
index 000000000000..54b76bb05f32
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramIndexed.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.data.CloseableIndexed;
+import org.apache.druid.segment.data.IndexedIterable;
+import org.apache.druid.segment.data.ObjectStrategy;
+import org.apache.druid.segment.serde.Serializer;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Iterator;
+
+/**
+ * A generic, flat storage mechanism. Use static SpectatorHistogramSerializer.create to construct.
+ * Supports efficient storage for sparse columns that contain lots of nulls.
+ *
+ * Storage Format:
+ *
+ * byte 1: version (0x1)
+ * byte 2: reserved flags
+ * bytes 3-6 =>; numBytesUsed for header and values
+ * bytes 7-some =>; header including count, bitmap of present values and offsets to values.
+ * bytes (header.serializedSize + 6)-(numBytesUsed + 6): bytes representing the values. If offset is null, then the value is null.
+ */
+public class SpectatorHistogramIndexed implements CloseableIndexed, Serializer
+{
+ static final byte VERSION_ONE = 0x1;
+ static final byte RESERVED_FLAGS = 0x0;
+
+ public static SpectatorHistogramIndexed read(ByteBuffer buffer, ObjectStrategy strategy)
+ {
+ byte versionFromBuffer = buffer.get();
+
+ if (VERSION_ONE == versionFromBuffer) {
+ // Reserved flags, not currently used
+ buffer.get();
+ int sizeOfOffsetsAndValues = buffer.getInt();
+ ByteBuffer bufferToUse = buffer.slice();
+ bufferToUse.limit(sizeOfOffsetsAndValues);
+
+ buffer.position(buffer.position() + sizeOfOffsetsAndValues);
+
+ return new SpectatorHistogramIndexed(
+ bufferToUse,
+ strategy
+ );
+ }
+ throw new IAE("Unknown version[%d]", (int) versionFromBuffer);
+ }
+
+ private final ObjectStrategy strategy;
+ private final int size;
+ private final NullableOffsetsHeader offsetsHeader;
+ private final ByteBuffer valueBuffer;
+
+ private SpectatorHistogramIndexed(
+ ByteBuffer buffer,
+ ObjectStrategy strategy
+ )
+ {
+ this.strategy = strategy;
+ offsetsHeader = NullableOffsetsHeader.read(buffer);
+ // Size is count of entries
+ size = offsetsHeader.size();
+ // The rest of the buffer is the values
+ valueBuffer = buffer.slice();
+ }
+
+ /**
+ * Checks if {@code index} a valid `element index` in SpectatorHistogramIndexed.
+ * Similar to Preconditions.checkElementIndex() except this method throws {@link IAE} with custom error message.
+ *
+ * Used here to get existing behavior(same error message and exception) of V1 GenericIndexed.
+ *
+ * @param index index identifying an element of an SpectatorHistogramIndexed.
+ */
+ private void checkIndex(int index)
+ {
+ if (index < 0) {
+ throw new IAE("Index[%s] < 0", index);
+ }
+ if (index >= size) {
+ throw new IAE("Index[%d] >= size[%d]", index, size);
+ }
+ }
+
+ public Class extends SpectatorHistogram> getClazz()
+ {
+ return strategy.getClazz();
+ }
+
+ @Override
+ public int size()
+ {
+ return size;
+ }
+
+ @Nullable
+ @Override
+ public SpectatorHistogram get(int index)
+ {
+ checkIndex(index);
+
+ NullableOffsetsHeader.Offset offset = offsetsHeader.get(index);
+ if (offset == null) {
+ return null;
+ }
+
+ ByteBuffer copyValueBuffer = valueBuffer.asReadOnlyBuffer();
+ copyValueBuffer.position(offset.getStart());
+ copyValueBuffer.limit(offset.getStart() + offset.getLength());
+
+ return strategy.fromByteBuffer(copyValueBuffer, offset.getLength());
+ }
+
+ @Override
+ public int indexOf(@Nullable SpectatorHistogram value)
+ {
+ throw new UnsupportedOperationException("Reverse lookup not allowed.");
+ }
+
+ @Override
+ public Iterator iterator()
+ {
+ return IndexedIterable.create(this).iterator();
+ }
+
+ @Override
+ public long getSerializedSize()
+ {
+ throw new UnsupportedOperationException("Serialization not supported here");
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
+ {
+ throw new UnsupportedOperationException("Serialization not supported here");
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+ inspector.visit("headerBuffer", offsetsHeader);
+ inspector.visit("firstValueBuffer", valueBuffer);
+ inspector.visit("strategy", strategy);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SpectatorHistogramIndexed[" + "size: "
+ + size()
+ + " cardinality: "
+ + offsetsHeader.getCardinality()
+ + ']';
+ }
+
+ @Override
+ public void close()
+ {
+ // nothing to close
+ }
+}
diff --git a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramJsonSerializer.java b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramJsonSerializer.java
new file mode 100644
index 000000000000..fb0a32b45024
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramJsonSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import java.io.IOException;
+
+public class SpectatorHistogramJsonSerializer extends JsonSerializer
+{
+ @Override
+ public void serialize(
+ SpectatorHistogram spectatorHistogram,
+ JsonGenerator jsonGenerator,
+ SerializerProvider serializerProvider
+ ) throws IOException
+ {
+ spectatorHistogram.serialize(jsonGenerator, serializerProvider);
+ }
+}
diff --git a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramModule.java b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramModule.java
new file mode 100644
index 000000000000..b12c600d6b42
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramModule.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.segment.serde.ComplexMetrics;
+
+import java.util.List;
+
+/**
+ * Module defining various aggregators for Spectator Histograms
+ */
+public class SpectatorHistogramModule implements DruidModule
+{
+ @VisibleForTesting
+ public static void registerSerde()
+ {
+ ComplexMetrics.registerSerde(
+ SpectatorHistogramAggregatorFactory.TYPE_NAME,
+ new SpectatorHistogramComplexMetricSerde(SpectatorHistogramAggregatorFactory.TYPE_NAME)
+ );
+ ComplexMetrics.registerSerde(
+ SpectatorHistogramAggregatorFactory.Timer.TYPE_NAME,
+ new SpectatorHistogramComplexMetricSerde(SpectatorHistogramAggregatorFactory.Timer.TYPE_NAME)
+ );
+ ComplexMetrics.registerSerde(
+ SpectatorHistogramAggregatorFactory.Distribution.TYPE_NAME,
+ new SpectatorHistogramComplexMetricSerde(SpectatorHistogramAggregatorFactory.Distribution.TYPE_NAME)
+ );
+ }
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(
+ new SimpleModule(
+ getClass().getSimpleName()
+ ).registerSubtypes(
+ new NamedType(
+ SpectatorHistogramAggregatorFactory.class,
+ SpectatorHistogramAggregatorFactory.TYPE_NAME
+ ),
+ new NamedType(
+ SpectatorHistogramAggregatorFactory.Timer.class,
+ SpectatorHistogramAggregatorFactory.Timer.TYPE_NAME
+ ),
+ new NamedType(
+ SpectatorHistogramAggregatorFactory.Distribution.class,
+ SpectatorHistogramAggregatorFactory.Distribution.TYPE_NAME
+ ),
+ new NamedType(
+ SpectatorHistogramPercentilePostAggregator.class,
+ SpectatorHistogramPercentilePostAggregator.TYPE_NAME
+ ),
+ new NamedType(
+ SpectatorHistogramPercentilesPostAggregator.class,
+ SpectatorHistogramPercentilesPostAggregator.TYPE_NAME
+ )
+ ).addSerializer(SpectatorHistogram.class, new SpectatorHistogramJsonSerializer())
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ registerSerde();
+ }
+}
diff --git a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramObjectStrategy.java b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramObjectStrategy.java
new file mode 100644
index 000000000000..33b59bd6ad6d
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramObjectStrategy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import org.apache.druid.segment.data.ObjectStrategy;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+public class SpectatorHistogramObjectStrategy implements ObjectStrategy
+{
+ private static final byte[] EMPTY_BYTES = null;
+
+ @Override
+ public Class getClazz()
+ {
+ return SpectatorHistogram.class;
+ }
+
+ @Override
+ public SpectatorHistogram fromByteBuffer(ByteBuffer readOnlyBuffer, int numBytes)
+ {
+ if (numBytes == 0) {
+ return null;
+ }
+ return SpectatorHistogram.fromByteBuffer(readOnlyBuffer);
+ }
+
+ @Override
+ public byte[] toBytes(@Nullable SpectatorHistogram val)
+ {
+ if (val == null) {
+ return EMPTY_BYTES;
+ }
+ return val.toBytes();
+ }
+
+ @Override
+ public int compare(SpectatorHistogram o1, SpectatorHistogram o2)
+ {
+ return SpectatorHistogramAggregatorFactory.COMPARATOR.compare(o1, o2);
+ }
+}
diff --git a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilePostAggregator.java b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilePostAggregator.java
new file mode 100644
index 000000000000..80854c57d460
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilePostAggregator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Doubles;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.aggregation.post.PostAggregatorIds;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class SpectatorHistogramPercentilePostAggregator implements PostAggregator
+{
+
+ private final String name;
+ private final PostAggregator field;
+
+ private final double percentile;
+
+ public static final String TYPE_NAME = "percentileSpectatorHistogram";
+
+ @JsonCreator
+ public SpectatorHistogramPercentilePostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("field") final PostAggregator field,
+ @JsonProperty("percentile") final double percentile
+ )
+ {
+ this.name = Preconditions.checkNotNull(name, "name is null");
+ this.field = Preconditions.checkNotNull(field, "field is null");
+ Preconditions.checkArgument(
+ percentile >= 0 && percentile <= 100,
+ "Percentile argument not in range (0, 100)"
+ );
+ this.percentile = percentile;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public ColumnType getType(ColumnInspector signature)
+ {
+ return ColumnType.DOUBLE;
+ }
+
+ @JsonProperty
+ public PostAggregator getField()
+ {
+ return field;
+ }
+
+ @JsonProperty
+ public double getPercentile()
+ {
+ return percentile;
+ }
+
+ @Override
+ public Object compute(final Map combinedAggregators)
+ {
+ final SpectatorHistogram sketch = (SpectatorHistogram) field.compute(combinedAggregators);
+ return sketch.getPercentileValue(percentile);
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return Doubles::compare;
+ }
+
+ @Override
+ public Set getDependentFields()
+ {
+ return field.getDependentFields();
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "{" +
+ "name='" + name + '\'' +
+ ", field=" + field +
+ ", fraction=" + percentile +
+ "}";
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ final CacheKeyBuilder builder = new CacheKeyBuilder(
+ PostAggregatorIds.SPECTATOR_HISTOGRAM_SKETCH_PERCENTILE_CACHE_TYPE_ID).appendCacheable(field);
+ builder.appendDouble(percentile);
+ return builder.build();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SpectatorHistogramPercentilePostAggregator that = (SpectatorHistogramPercentilePostAggregator) o;
+ return Double.compare(that.percentile, percentile) == 0 &&
+ Objects.equals(name, that.name) &&
+ Objects.equals(field, that.field);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(name, field, percentile);
+ }
+
+ @Override
+ public PostAggregator decorate(final Map map)
+ {
+ return this;
+ }
+}
diff --git a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilesPostAggregator.java b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilesPostAggregator.java
new file mode 100644
index 000000000000..11ce9e0d9bd4
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilesPostAggregator.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Doubles;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.aggregation.post.PostAggregatorIds;
+import org.apache.druid.query.cache.CacheKeyBuilder;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnType;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Set;
+
+public class SpectatorHistogramPercentilesPostAggregator implements PostAggregator
+{
+ private final String name;
+ private final PostAggregator field;
+
+ private final double[] percentiles;
+
+ public static final String TYPE_NAME = "percentilesSpectatorHistogram";
+
+ @JsonCreator
+ public SpectatorHistogramPercentilesPostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("field") final PostAggregator field,
+ @JsonProperty("percentiles") final double[] percentiles
+ )
+ {
+ this.name = Preconditions.checkNotNull(name, "name is null");
+ this.field = Preconditions.checkNotNull(field, "field is null");
+ this.percentiles = Preconditions.checkNotNull(percentiles, "array of fractions is null");
+ Preconditions.checkArgument(this.percentiles.length >= 1, "Array of percentiles cannot " +
+ "be empty");
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public ColumnType getType(ColumnInspector signature)
+ {
+ return ColumnType.DOUBLE_ARRAY;
+ }
+
+ @JsonProperty
+ public PostAggregator getField()
+ {
+ return field;
+ }
+
+ @JsonProperty
+ public double[] getPercentiles()
+ {
+ return percentiles;
+ }
+
+ @Override
+ public Object compute(final Map combinedAggregators)
+ {
+ final SpectatorHistogram sketch = (SpectatorHistogram) field.compute(combinedAggregators);
+ return sketch.getPercentileValues(percentiles);
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return Doubles::compare;
+ }
+
+ @Override
+ public Set getDependentFields()
+ {
+ return field.getDependentFields();
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + "{" +
+ "name='" + name + '\'' +
+ ", field=" + field +
+ ", percentiles=" + Arrays.toString(percentiles) +
+ "}";
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ final CacheKeyBuilder builder = new CacheKeyBuilder(
+ PostAggregatorIds.SPECTATOR_HISTOGRAM_SKETCH_PERCENTILES_CACHE_TYPE_ID).appendCacheable(field);
+ for (final double value : percentiles) {
+ builder.appendDouble(value);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final SpectatorHistogramPercentilesPostAggregator that = (SpectatorHistogramPercentilesPostAggregator) o;
+ if (!name.equals(that.name)) {
+ return false;
+ }
+ if (!Arrays.equals(percentiles, that.percentiles)) {
+ return false;
+ }
+ return field.equals(that.field);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return (name.hashCode() * 31 + field.hashCode()) * 31 + Arrays.hashCode(percentiles);
+ }
+
+ @Override
+ public PostAggregator decorate(final Map map)
+ {
+ return this;
+ }
+}
diff --git a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramSerializer.java b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramSerializer.java
new file mode 100644
index 000000000000..2e4608fee788
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramSerializer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import com.google.common.primitives.Ints;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.GenericColumnSerializer;
+import org.apache.druid.segment.data.ColumnCapacityExceededException;
+import org.apache.druid.segment.data.ObjectStrategy;
+import org.apache.druid.segment.serde.MetaSerdeHelper;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.WriteOutBytes;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class SpectatorHistogramSerializer implements GenericColumnSerializer
+{
+ private static final MetaSerdeHelper META_SERDE_HELPER = MetaSerdeHelper
+ .firstWriteByte((SpectatorHistogramSerializer x) -> SpectatorHistogramIndexed.VERSION_ONE)
+ .writeByte(x -> SpectatorHistogramIndexed.RESERVED_FLAGS)
+ // numBytesUsed field is header + values (i.e. all bytes _after_ this)
+ .writeInt(x -> Ints.checkedCast(x.offsetsHeader.getSerializedSize() + x.valuesBuffer.size()));
+
+ public static SpectatorHistogramSerializer create(
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ String columnName,
+ ObjectStrategy strategy
+ )
+ {
+ return new SpectatorHistogramSerializer(
+ columnName,
+ segmentWriteOutMedium,
+ strategy
+ );
+ }
+
+ private final String columnName;
+ private final SegmentWriteOutMedium segmentWriteOutMedium;
+ private final ObjectStrategy objectStrategy;
+ private NullableOffsetsHeader offsetsHeader;
+ private WriteOutBytes valuesBuffer;
+
+ private int rowCount = 0;
+
+ private SpectatorHistogramSerializer(
+ String columnName,
+ SegmentWriteOutMedium segmentWriteOutMedium,
+ ObjectStrategy strategy
+ )
+ {
+ this.columnName = columnName;
+ this.segmentWriteOutMedium = segmentWriteOutMedium;
+ this.objectStrategy = strategy;
+ }
+
+ @Override
+ public void open() throws IOException
+ {
+ this.offsetsHeader = NullableOffsetsHeader.create(segmentWriteOutMedium);
+ this.valuesBuffer = segmentWriteOutMedium.makeWriteOutBytes();
+ }
+
+ @Override
+ public void serialize(ColumnValueSelector> selector) throws IOException
+ {
+ rowCount++;
+ if (rowCount < 0) {
+ throw new ColumnCapacityExceededException(columnName);
+ }
+ Object value = selector.getObject();
+ if (value == null) {
+ offsetsHeader.writeNull();
+ } else {
+ objectStrategy.writeTo((SpectatorHistogram) value, valuesBuffer);
+ offsetsHeader.writeOffset(Ints.checkedCast(valuesBuffer.size()));
+ }
+ }
+
+ @Override
+ public long getSerializedSize()
+ {
+ // Meta header, Offsets, Values
+ return META_SERDE_HELPER.size(this) + offsetsHeader.getSerializedSize() + valuesBuffer.size();
+ }
+
+ @Override
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
+ {
+ META_SERDE_HELPER.writeTo(channel, this);
+ offsetsHeader.writeTo(channel, null);
+ valuesBuffer.writeTo(channel);
+ }
+}
diff --git a/extensions-contrib/spectator-histogram/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-contrib/spectator-histogram/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 000000000000..f158b84da3f9
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.spectator.histogram.SpectatorHistogramModule
diff --git a/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/NullableOffsetsHeaderTest.java b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/NullableOffsetsHeaderTest.java
new file mode 100644
index 000000000000..add0d88efceb
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/NullableOffsetsHeaderTest.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.List;
+
+public class NullableOffsetsHeaderTest
+{
+ @Test
+ public void testShouldAcceptNullWrites() throws IOException
+ {
+ NullableOffsetsHeader header = NullableOffsetsHeader.create(new OnHeapMemorySegmentWriteOutMedium());
+ header.writeNull();
+ header.writeNull();
+ header.writeNull();
+
+ Assert.assertEquals("Size should be count of entries", 3, header.size());
+
+ header = serde(header);
+ Assert.assertEquals("Size should be count of entries", 3, header.size());
+
+ Assert.assertNull("Should return null for null entries by index", header.get(0));
+ Assert.assertNull("Should return null for null entries by index", header.get(1));
+ Assert.assertNull("Should return null for null entries by index", header.get(2));
+ }
+
+ @Test
+ public void testShouldAcceptOffsetWrites() throws IOException
+ {
+ NullableOffsetsHeader header = NullableOffsetsHeader.create(new OnHeapMemorySegmentWriteOutMedium());
+ header.writeOffset(123);
+ header.writeOffset(456);
+
+ Assert.assertEquals("Size should be count of entries", 2, header.size());
+
+ header = serde(header);
+ Assert.assertEquals("Size should be count of entries", 2, header.size());
+
+ Assert.assertNotNull("Should flag nulls by index", header.get(0));
+ Assert.assertNotNull("Should flag nulls by index", header.get(1));
+
+ Assert.assertEquals("Should return value for entries by index", 0, header.get(0).getStart());
+ Assert.assertEquals("Should return value for entries by index", 123, header.get(0).getEnd());
+ Assert.assertEquals("Should return value for entries by index", 123, header.get(1).getStart());
+ Assert.assertEquals("Should return value for entries by index", 456, header.get(1).getEnd());
+ }
+
+ @Test
+ public void testShouldAcceptMixedWrites() throws IOException
+ {
+ NullableOffsetsHeader header = NullableOffsetsHeader.create(new OnHeapMemorySegmentWriteOutMedium());
+ header.writeOffset(123);
+ header.writeNull();
+ header.writeNull();
+ header.writeOffset(456);
+ header.writeOffset(789);
+ header.writeNull();
+
+ Assert.assertEquals("Size should be count of entries", 6, header.size());
+
+ header = serde(header);
+ Assert.assertEquals("Size should be count of entries", 6, header.size());
+
+ Assert.assertNotNull("Should flag nulls by index", header.get(0));
+ Assert.assertNull("Should flag nulls by index", header.get(1));
+ Assert.assertNull("Should flag nulls by index", header.get(2));
+ Assert.assertNotNull("Should flag nulls by index", header.get(3));
+ Assert.assertNotNull("Should flag nulls by index", header.get(4));
+ Assert.assertNull("Should flag nulls by index", header.get(5));
+
+ Assert.assertEquals("Should return value for entries by index", 0, header.get(0).getStart());
+ Assert.assertEquals("Should return value for entries by index", 123, header.get(0).getEnd());
+ Assert.assertEquals("Should return value for entries by index", 123, header.get(3).getStart());
+ Assert.assertEquals("Should return value for entries by index", 456, header.get(3).getEnd());
+ Assert.assertEquals("Should return value for entries by index", 456, header.get(4).getStart());
+ Assert.assertEquals("Should return value for entries by index", 789, header.get(4).getEnd());
+ }
+
+ @Test
+ public void testGiveAccessToOffsets() throws IOException
+ {
+ NullableOffsetsHeader header = NullableOffsetsHeader.create(new OnHeapMemorySegmentWriteOutMedium());
+ header.writeOffset(123);
+ header.writeNull();
+ header.writeNull();
+ header.writeOffset(456);
+ header.writeOffset(789);
+ header.writeNull();
+
+ header = serde(header);
+
+ Assert.assertNull("Should return null for 6", header.get(6));
+
+ Assert.assertNull("Should return null for 5", header.get(5));
+
+ Assert.assertEquals("Offset at 4", 789, header.get(4).getEnd());
+ Assert.assertEquals("Offset prior to 4", 456, header.get(4).getStart());
+
+ Assert.assertEquals("Offset at 3", 456, header.get(3).getEnd());
+ Assert.assertEquals("Offset prior to 3", 123, header.get(3).getStart());
+
+ Assert.assertNull("Should return null for 2", header.get(2));
+
+ Assert.assertNull("Should return null for 1", header.get(1));
+
+ Assert.assertEquals("Offset at 0", 123, header.get(0).getEnd());
+ Assert.assertEquals("Offset prior to 0", 0, header.get(0).getStart());
+ }
+
+ @Test
+ public void testGiveAccessToSingleOffsetNulls() throws IOException
+ {
+ NullableOffsetsHeader header = NullableOffsetsHeader.create(new OnHeapMemorySegmentWriteOutMedium());
+ header.writeNull();
+ header.writeOffset(123);
+ header.writeNull();
+ header.writeNull();
+ header.writeNull();
+
+ header = serde(header);
+
+ Assert.assertEquals("Offset at 1", 123, header.get(1).getEnd());
+ Assert.assertEquals("Offset prior to 1", 0, header.get(1).getStart());
+
+ Assert.assertNull("Nulls for anything not set", header.get(0));
+ Assert.assertNull("Nulls for anything not set", header.get(-1));
+ Assert.assertNull("Nulls for anything not set", header.get(3));
+ Assert.assertNull("Nulls for anything not set", header.get(100));
+ }
+
+ @Test
+ public void testShouldSerializeAndDeserialize() throws IOException
+ {
+ NullableOffsetsHeader header = NullableOffsetsHeader.create(new OnHeapMemorySegmentWriteOutMedium());
+ header.writeOffset(123);
+ header.writeNull();
+ header.writeNull();
+ header.writeOffset(456);
+ header.writeOffset(789);
+ header.writeNull();
+
+ // Length + BitmapLength + Bitmap + Offsets
+ // 4 + 4 + 1 + 12 = 21 bytes
+ Assert.assertEquals("Serialized size should be minimal", 21, header.getSerializedSize());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final WritableByteChannel channel = Channels.newChannel(baos);
+ header.writeTo(channel, null);
+ channel.close();
+
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray());
+ Assert.assertEquals(
+ "Reported size and actual size should match",
+ header.getSerializedSize(),
+ byteBuffer.remaining()
+ );
+
+ NullableOffsetsHeader deserialized = NullableOffsetsHeader.read(byteBuffer);
+ Assert.assertEquals(0, byteBuffer.remaining());
+
+ Assert.assertEquals("Deserialized should match pre-serialized size", header.size(), deserialized.size());
+
+ // Nulls should return the previous offset
+ List expected = Arrays.asList(
+ new NullableOffsetsHeader.Offset(0, 123),
+ null,
+ null,
+ new NullableOffsetsHeader.Offset(123, 456),
+ new NullableOffsetsHeader.Offset(456, 789),
+ null
+ );
+
+ for (int i = 0; i < header.size(); i++) {
+ Assert.assertEquals("Deserialized should match pre-serialized values", expected.get(i), deserialized.get(i));
+ }
+ }
+
+ @Test
+ public void testShouldSerializeAndDeserializeAllNulls() throws IOException
+ {
+ NullableOffsetsHeader header = NullableOffsetsHeader.create(new OnHeapMemorySegmentWriteOutMedium());
+ for (int i = 0; i < 10000; i++) {
+ header.writeNull();
+ }
+
+ // Length + BitmapLength + Bitmap + Offsets
+ // 4 + 4 + 0 + 0 = 8 bytes
+ Assert.assertEquals("Serialized size should be minimal", 8, header.getSerializedSize());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final WritableByteChannel channel = Channels.newChannel(baos);
+ header.writeTo(channel, null);
+ channel.close();
+
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray());
+ Assert.assertEquals(
+ "Reported size and actual size should match",
+ header.getSerializedSize(),
+ byteBuffer.remaining()
+ );
+
+ NullableOffsetsHeader deserialized = NullableOffsetsHeader.read(byteBuffer);
+ Assert.assertEquals(0, byteBuffer.remaining());
+
+ Assert.assertEquals("Deserialized should match pre-serialized size", header.size(), deserialized.size());
+
+ for (int i = 0; i < header.size(); i++) {
+ Assert.assertNull("Deserialized should be null", deserialized.get(i));
+ }
+ }
+
+ @Test
+ public void testShouldSerializeAndDeserializeAllValues() throws IOException
+ {
+ NullableOffsetsHeader header = NullableOffsetsHeader.create(new OnHeapMemorySegmentWriteOutMedium());
+ for (int i = 0; i < 10000; i++) {
+ header.writeOffset(i + 1);
+ }
+
+ // Length + BitmapLength + Bitmap + Offsets
+ // 4 + 4 + 0 + 40000 = 40008 bytes
+ // Bitmap is omitted if all values are set
+ Assert.assertEquals("Serialized size should be minimal", 40008, header.getSerializedSize());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final WritableByteChannel channel = Channels.newChannel(baos);
+ header.writeTo(channel, null);
+ channel.close();
+
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray());
+ Assert.assertEquals(
+ "Reported size and actual size should match",
+ header.getSerializedSize(),
+ byteBuffer.remaining()
+ );
+
+ NullableOffsetsHeader deserialized = NullableOffsetsHeader.read(byteBuffer);
+ Assert.assertEquals(0, byteBuffer.remaining());
+
+ Assert.assertEquals("Deserialized should match pre-serialized size", header.size(), deserialized.size());
+
+ for (int i = 0; i < header.size(); i++) {
+ Assert.assertNotNull("Deserialized should be set " + i, deserialized.get(i));
+ Assert.assertEquals("Deserialized should match pre-serialized nulls " + i, i + 1, deserialized.get(i).getEnd());
+ }
+ }
+
+ @Test
+ public void testShouldFindOffsetFromIndexSingleWord() throws IOException
+ {
+ // Should return the exact index of the offset to read, or negative if not present
+ List expectedOffsetIndexes = ImmutableList.of(15, 21, 30, 31);
+ NullableOffsetsHeader header = createHeaderWithIndexesSet(expectedOffsetIndexes);
+ Assert.assertEquals("Size should be count of entries", 32, header.size());
+ header = serde(header);
+
+ for (int i = 0; i < header.size(); i++) {
+ int offsetIndex = header.getOffsetIndex(i);
+ int expected = expectedOffsetIndexes.indexOf(i);
+ Assert.assertEquals("Offset " + i, expected, offsetIndex);
+ }
+ }
+
+ @Test
+ public void testShouldFindOffsetFromIndexMultipleWords() throws IOException
+ {
+ // Should return the exact index of the offset to read, or negative if not present
+ List expectedOffsetIndexes = ImmutableList.of(15, 21, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 70, 100);
+ NullableOffsetsHeader header = createHeaderWithIndexesSet(expectedOffsetIndexes);
+ Assert.assertEquals("Size should be count of entries", 101, header.size());
+ header = serde(header);
+
+ for (int i = 0; i < header.size(); i++) {
+ int offsetIndex = header.getOffsetIndex(i);
+ int expected = expectedOffsetIndexes.indexOf(i);
+ Assert.assertEquals("Offset " + i, expected, offsetIndex);
+ }
+ }
+
+ @Test
+ public void testShouldFindOffsetFromIndexFull() throws IOException
+ {
+ // For a full header, the bitset is omitted.
+ // The expected index, is the queried index.
+ final int size = 500;
+ NullableOffsetsHeader header = NullableOffsetsHeader.create(new OnHeapMemorySegmentWriteOutMedium());
+ for (int i = 0; i < size; i++) {
+ header.writeOffset(i + 1);
+ }
+ Assert.assertEquals("Size should be count of entries", size, header.size());
+ header = serde(header);
+
+ for (int i = 0; i < size; i++) {
+ int offsetIndex = header.getOffsetIndex(i);
+ Assert.assertEquals("Offset " + i, i, offsetIndex);
+ }
+ }
+
+ @Test
+ public void testShouldFindOffsetFromIndexEmpty() throws IOException
+ {
+ // For an empty header, the bitset is omitted.
+ // The expected index, is always -1.
+ final int size = 500;
+ NullableOffsetsHeader header = NullableOffsetsHeader.create(new OnHeapMemorySegmentWriteOutMedium());
+ for (int i = 0; i < size; i++) {
+ header.writeNull();
+ }
+ Assert.assertEquals("Size should be count of entries", size, header.size());
+ header = serde(header);
+
+ for (int i = 0; i < size; i++) {
+ int offsetIndex = header.getOffsetIndex(i);
+ Assert.assertEquals("Offset " + i, -1, offsetIndex);
+ }
+ }
+
+ @Test
+ public void testShouldWorkWithBitsSetAfter64bitBoundary() throws IOException
+ {
+ List expectedOffsetIndexes = ImmutableList.of(0, 1, 2, 3, 4, 256, 257);
+ NullableOffsetsHeader header = createHeaderWithIndexesSet(expectedOffsetIndexes);
+ Assert.assertEquals("Size should be count of entries", 258, header.size());
+ header = serde(header);
+ Assert.assertEquals("Size should be count of entries", 258, header.size());
+ Assert.assertEquals("Cardinality should be count of non-nulls", expectedOffsetIndexes.size(), header.getCardinality());
+
+ for (int i = 0; i < header.size(); i++) {
+ int offsetIndex = header.getOffsetIndex(i);
+ int expectedOffset = expectedOffsetIndexes.indexOf(i);
+ Assert.assertEquals("Offset " + i, expectedOffset, offsetIndex);
+
+ NullableOffsetsHeader.Offset offset = header.get(i);
+ if (expectedOffset < 0) {
+ Assert.assertNull("Null Offset " + i, offset);
+ } else {
+ int expectedOffsetStart = expectedOffset;
+ int expectedOffsetEnd = expectedOffset + 1;
+ Assert.assertEquals("Offset Start " + i, expectedOffsetStart, offset.getStart());
+ Assert.assertEquals("Offset End " + i, expectedOffsetEnd, offset.getEnd());
+ Assert.assertEquals("Offset Length " + i, 1, offset.getLength());
+ }
+ }
+ }
+
+ @Test
+ public void testShouldWorkOnLongByteBoundaries() throws IOException
+ {
+ for (int x = 1; x < 24; x++) {
+ int boundary = ((int) Math.pow(2, x)) - 1;
+ List expectedOffsetIndexes = ImmutableList.of(boundary - 1);
+ NullableOffsetsHeader header = createHeaderWithIndexesSet(expectedOffsetIndexes);
+ Assert.assertEquals("Size should be count of entries", boundary, header.size());
+ header = serde(header);
+ Assert.assertEquals("Size should be count of entries", boundary, header.size());
+ Assert.assertEquals(
+ "Cardinality should be count of non-nulls",
+ expectedOffsetIndexes.size(),
+ header.getCardinality()
+ );
+
+ for (int i = 0; i < header.size(); i++) {
+ int offsetIndex = header.getOffsetIndex(i);
+ int expectedOffset = expectedOffsetIndexes.indexOf(i);
+ Assert.assertEquals("Offset " + i, expectedOffset, offsetIndex);
+
+ NullableOffsetsHeader.Offset offset = header.get(i);
+ if (expectedOffset < 0) {
+ Assert.assertNull("Null Offset " + i, offset);
+ } else {
+ int expectedOffsetStart = expectedOffset;
+ int expectedOffsetEnd = expectedOffset + 1;
+ Assert.assertEquals("Offset Start " + i, expectedOffsetStart, offset.getStart());
+ Assert.assertEquals("Offset End " + i, expectedOffsetEnd, offset.getEnd());
+ Assert.assertEquals("Offset Length " + i, 1, offset.getLength());
+ }
+ }
+ }
+ }
+
+ /**
+ * Test helper to serialize and deserialize a NullableOffsetsHeader
+ *
+ * @param in The NullableOffsetsHeader to serialize
+ * @return The deserialized representation of in.
+ */
+ NullableOffsetsHeader serde(NullableOffsetsHeader in) throws IOException
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final WritableByteChannel channel = Channels.newChannel(baos);
+ in.writeTo(channel, null);
+ channel.close();
+
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray());
+ return NullableOffsetsHeader.read(byteBuffer);
+ }
+
+ /**
+ * Helper to make a header with the provided indexes set
+ */
+ NullableOffsetsHeader createHeaderWithIndexesSet(List indexes) throws IOException
+ {
+ NullableOffsetsHeader header = NullableOffsetsHeader.create(new OnHeapMemorySegmentWriteOutMedium());
+ int offset = 1;
+ for (Integer idx : indexes) {
+ while (header.size() < idx) {
+ header.writeNull();
+ }
+ header.writeOffset(offset++);
+ }
+ return header;
+ }
+}
diff --git a/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorTest.java b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorTest.java
new file mode 100644
index 000000000000..1c30cfc05c36
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorTest.java
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.netflix.spectator.api.histogram.PercentileBuckets;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.Result;
+import org.apache.druid.query.aggregation.AggregationTestHelper;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.AggregatorUtil;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
+import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
+import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
+import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
+import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
+import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.SegmentId;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class SpectatorHistogramAggregatorTest extends InitializedNullHandlingTest
+{
+ public static final String INPUT_DATA_PARSE_SPEC = String.join(
+ "\n",
+ "{",
+ " \"type\": \"string\",",
+ " \"parseSpec\": {",
+ " \"format\": \"tsv\",",
+ " \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMddHH\"},",
+ " \"dimensionsSpec\": {",
+ " \"dimensions\": [\"product\"],",
+ " \"dimensionExclusions\": [],",
+ " \"spatialDimensions\": []",
+ " },",
+ " \"columns\": [\"timestamp\", \"product\", \"cost\"]",
+ " }",
+ "}"
+ );
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final SegmentMetadataQueryRunnerFactory METADATA_QR_FACTORY = new SegmentMetadataQueryRunnerFactory(
+ new SegmentMetadataQueryQueryToolChest(new SegmentMetadataQueryConfig()),
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER
+ );
+ private static final Map EXPECTED_HISTOGRAMS = new HashMap<>();
+
+ static {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(PercentileBuckets.indexOf(10), 1L);
+ EXPECTED_HISTOGRAMS.put("A", histogram);
+
+ histogram = new SpectatorHistogram();
+ histogram.add(PercentileBuckets.indexOf(30 + 40 + 40 + 40 + 50 + 50), 1L);
+ EXPECTED_HISTOGRAMS.put("B", histogram);
+
+ histogram = new SpectatorHistogram();
+ histogram.add(PercentileBuckets.indexOf(50 + 20000), 1L);
+ EXPECTED_HISTOGRAMS.put("C", histogram);
+ }
+
+ private final AggregationTestHelper helper;
+ private final AggregationTestHelper timeSeriesHelper;
+
+ public SpectatorHistogramAggregatorTest(final GroupByQueryConfig config)
+ {
+ SpectatorHistogramModule.registerSerde();
+ SpectatorHistogramModule module = new SpectatorHistogramModule();
+ helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+ module.getJacksonModules(), config, tempFolder);
+ timeSeriesHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
+ module.getJacksonModules(),
+ tempFolder
+ );
+ }
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection> constructorFeeder()
+ {
+ final List constructors = new ArrayList<>();
+ for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) {
+ constructors.add(new Object[]{config});
+ }
+ return constructors;
+ }
+
+ // this is to test Json properties and equals
+ @Test
+ public void serializeDeserializeFactoryWithFieldName() throws Exception
+ {
+ ObjectMapper objectMapper = new DefaultObjectMapper();
+ new SpectatorHistogramModule().getJacksonModules().forEach(objectMapper::registerModule);
+ SpectatorHistogramAggregatorFactory factory = new SpectatorHistogramAggregatorFactory(
+ "name",
+ "filedName",
+ AggregatorUtil.SPECTATOR_HISTOGRAM_CACHE_TYPE_ID
+ );
+ AggregatorFactory other = objectMapper.readValue(
+ objectMapper.writeValueAsString(factory),
+ AggregatorFactory.class
+ );
+
+ Assert.assertEquals(factory, other);
+ }
+
+ @Test
+ public void testBuildingHistogramQueryTime() throws Exception
+ {
+ Sequence seq = helper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"longSum\", \"name\": \"cost_sum\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"groupBy\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"dimensions\": [\"product\"],",
+ " \"aggregations\": [",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"cost_histogram\", \"fieldName\": "
+ + "\"cost_sum\"}",
+ " ],",
+ " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+ "}"
+ )
+ );
+ List results = seq.toList();
+ assertResultsMatch(results, 0, "A");
+ assertResultsMatch(results, 1, "B");
+ assertResultsMatch(results, 2, "C");
+ }
+
+ @Test
+ public void testBuildingAndMergingHistograms() throws Exception
+ {
+ Sequence seq = helper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"histogram\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"groupBy\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"dimenions\": [],",
+ " \"aggregations\": [",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"merged_cost_histogram\", \"fieldName\": "
+ + "\"histogram\"}",
+ " ],",
+ " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+ "}"
+ )
+ );
+ SpectatorHistogram expected = new SpectatorHistogram();
+ expected.add(PercentileBuckets.indexOf(10), 1L);
+ expected.add(PercentileBuckets.indexOf(30), 1L);
+ expected.add(PercentileBuckets.indexOf(40), 3L);
+ expected.add(PercentileBuckets.indexOf(50), 3L);
+ expected.add(PercentileBuckets.indexOf(20000), 1L);
+
+ List results = seq.toList();
+ Assert.assertEquals(1, results.size());
+ Assert.assertEquals(expected, results.get(0).get(0));
+ }
+
+ @Test
+ public void testBuildingAndMergingHistogramsTimeseriesQuery() throws Exception
+ {
+ Object rawseq = timeSeriesHelper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"histogram\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"timeseries\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"aggregations\": [",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"merged_cost_histogram\", \"fieldName\": "
+ + "\"histogram\"}",
+ " ],",
+ " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+ "}"
+ )
+ );
+ SpectatorHistogram expected = new SpectatorHistogram();
+ expected.add(PercentileBuckets.indexOf(10), 1L);
+ expected.add(PercentileBuckets.indexOf(30), 1L);
+ expected.add(PercentileBuckets.indexOf(40), 3L);
+ expected.add(PercentileBuckets.indexOf(50), 3L);
+ expected.add(PercentileBuckets.indexOf(20000), 1L);
+
+ Sequence> seq = (Sequence>) rawseq;
+ List> results = seq.toList();
+ Assert.assertEquals(1, results.size());
+ SpectatorHistogram value = (SpectatorHistogram) results.get(0).getValue().getMetric("merged_cost_histogram");
+ Assert.assertEquals(expected, value);
+ }
+
+ @Test
+ public void testBuildingAndMergingGroupbyHistograms() throws Exception
+ {
+ Sequence seq = helper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"histogram\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"groupBy\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"dimensions\": [\"product\"],",
+ " \"aggregations\": [",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"merged_histogram\", \"fieldName\": "
+ + "\"histogram\"}",
+ " ],",
+ " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+ "}"
+ )
+ );
+
+ List results = seq.toList();
+ Assert.assertEquals(6, results.size());
+
+ SpectatorHistogram expectedA = new SpectatorHistogram();
+ expectedA.add(PercentileBuckets.indexOf(10), 1L);
+ Assert.assertEquals(expectedA, results.get(0).get(1));
+
+ SpectatorHistogram expectedB = new SpectatorHistogram();
+ expectedB.add(PercentileBuckets.indexOf(30), 1L);
+ expectedB.add(PercentileBuckets.indexOf(40), 3L);
+ expectedB.add(PercentileBuckets.indexOf(50), 2L);
+ Assert.assertEquals(expectedB, results.get(1).get(1));
+
+ SpectatorHistogram expectedC = new SpectatorHistogram();
+ expectedC.add(PercentileBuckets.indexOf(50), 1L);
+ expectedC.add(PercentileBuckets.indexOf(20000), 1L);
+ Assert.assertEquals(expectedC, results.get(2).get(1));
+
+ Assert.assertNull(results.get(3).get(1));
+ Assert.assertNull(results.get(4).get(1));
+ Assert.assertNull(results.get(5).get(1));
+ }
+
+ @Test
+ public void testBuildingAndCountingHistograms() throws Exception
+ {
+ Sequence seq = helper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"histogram\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"groupBy\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"dimenions\": [],",
+ " \"aggregations\": [",
+ " {\"type\": \"longSum\", \"name\": \"count_histogram\", \"fieldName\": "
+ + "\"histogram\"},",
+ " {\"type\": \"doubleSum\", \"name\": \"double_count_histogram\", \"fieldName\": "
+ + "\"histogram\"}",
+ " ],",
+ " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+ "}"
+ )
+ );
+
+ List results = seq.toList();
+ Assert.assertEquals(1, results.size());
+ // Check longSum
+ Assert.assertEquals(9L, results.get(0).get(0));
+ // Check doubleSum
+ Assert.assertEquals(9.0, (Double) results.get(0).get(1), 0.001);
+ }
+
+ @Test
+ public void testBuildingAndCountingHistogramsWithNullFilter() throws Exception
+ {
+ Sequence seq = helper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"histogram\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"groupBy\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"dimenions\": [],",
+ " \"aggregations\": [",
+ " {\"type\": \"longSum\", \"name\": \"count_histogram\", \"fieldName\": "
+ + "\"histogram\"},",
+ " {\"type\": \"doubleSum\", \"name\": \"double_count_histogram\", \"fieldName\": "
+ + "\"histogram\"}",
+ " ],",
+ " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"],",
+ " \"filter\": {\n",
+ " \"fields\": [\n",
+ " {\n",
+ " \"field\": {\n",
+ " \"dimension\": \"histogram\",\n",
+ " \"value\": \"0\",\n",
+ " \"type\": \"selector\"\n",
+ " },\n",
+ " \"type\": \"not\"\n",
+ " },\n",
+ " {\n",
+ " \"field\": {\n",
+ " \"dimension\": \"histogram\",\n",
+ " \"value\": \"\",\n",
+ " \"type\": \"selector\"\n",
+ " },\n",
+ " \"type\": \"not\"\n",
+ " }\n",
+ " ],\n",
+ " \"type\": \"and\"\n",
+ " }",
+ "}"
+ )
+ );
+
+ List results = seq.toList();
+ Assert.assertEquals(1, results.size());
+ // Check longSum
+ Assert.assertEquals(9L, results.get(0).get(0));
+ // Check doubleSum
+ Assert.assertEquals(9.0, (Double) results.get(0).get(1), 0.001);
+ }
+
+ @Test
+ public void testIngestAsHistogramDistribution() throws Exception
+ {
+ Sequence seq = helper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogramDistribution\", \"name\": \"histogram\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"groupBy\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"dimenions\": [],",
+ " \"aggregations\": [",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"merged_cost_histogram\", \"fieldName\": "
+ + "\"histogram\"}",
+ " ],",
+ " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+ "}"
+ )
+ );
+ SpectatorHistogram expected = new SpectatorHistogram();
+ expected.add(PercentileBuckets.indexOf(10), 1L);
+ expected.add(PercentileBuckets.indexOf(30), 1L);
+ expected.add(PercentileBuckets.indexOf(40), 3L);
+ expected.add(PercentileBuckets.indexOf(50), 3L);
+ expected.add(PercentileBuckets.indexOf(20000), 1L);
+
+ List results = seq.toList();
+ Assert.assertEquals(1, results.size());
+ Assert.assertEquals(expected, results.get(0).get(0));
+ }
+
+ @Test
+ public void testIngestHistogramsTimer() throws Exception
+ {
+ Sequence seq = helper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogramTimer\", \"name\": \"histogram\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"groupBy\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"dimenions\": [],",
+ " \"aggregations\": [",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"merged_cost_histogram\", \"fieldName\": "
+ + "\"histogram\"}",
+ " ],",
+ " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+ "}"
+ )
+ );
+ SpectatorHistogram expected = new SpectatorHistogram();
+ expected.add(PercentileBuckets.indexOf(10), 1L);
+ expected.add(PercentileBuckets.indexOf(30), 1L);
+ expected.add(PercentileBuckets.indexOf(40), 3L);
+ expected.add(PercentileBuckets.indexOf(50), 3L);
+ expected.add(PercentileBuckets.indexOf(20000), 1L);
+
+ List results = seq.toList();
+ Assert.assertEquals(1, results.size());
+ Assert.assertEquals(expected, results.get(0).get(0));
+ }
+
+ @Test
+ public void testIngestingPreaggregatedHistograms() throws Exception
+ {
+ Object rawseq = timeSeriesHelper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("pre_agg_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"histogram\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"timeseries\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"aggregations\": [",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"merged_cost_histogram\", \"fieldName\": "
+ + "\"histogram\"}",
+ " ],",
+ " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+ "}"
+ )
+ );
+ SpectatorHistogram expected = new SpectatorHistogram();
+ expected.add(PercentileBuckets.indexOf(10), 1L);
+ expected.add(PercentileBuckets.indexOf(30), 1L);
+ expected.add(PercentileBuckets.indexOf(40), 3L);
+ expected.add(PercentileBuckets.indexOf(50), 3L);
+ expected.add(PercentileBuckets.indexOf(20000), 1L);
+
+ Sequence> seq = (Sequence>) rawseq;
+ List> results = seq.toList();
+ Assert.assertEquals(1, results.size());
+ SpectatorHistogram value = (SpectatorHistogram) results.get(0).getValue().getMetric("merged_cost_histogram");
+ Assert.assertEquals(expected, value);
+ }
+
+ @Test
+ public void testMetadataQueryTimer() throws Exception
+ {
+ File segmentDir = tempFolder.newFolder();
+ helper.createIndex(
+ new File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogramTimer\", \"name\": \"histogram\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ segmentDir,
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ true
+ );
+
+ ObjectMapper mapper = (ObjectMapper) TestHelper.makeJsonMapper();
+ SpectatorHistogramModule module = new SpectatorHistogramModule();
+ module.getJacksonModules().forEach(mod -> mapper.registerModule(mod));
+ IndexIO indexIO = new IndexIO(
+ mapper,
+ new ColumnConfig() {}
+ );
+
+ QueryableIndex index = indexIO.loadIndex(segmentDir);
+
+ SegmentId segmentId = SegmentId.dummy("segmentId");
+ QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
+ METADATA_QR_FACTORY,
+ segmentId,
+ new QueryableIndexSegment(index, segmentId),
+ null
+ );
+
+ SegmentMetadataQuery segmentMetadataQuery = Druids.newSegmentMetadataQueryBuilder()
+ .dataSource("test_datasource")
+ .intervals("2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z")
+ .merge(true)
+ .build();
+ List results = runner.run(QueryPlus.wrap(segmentMetadataQuery)).toList();
+ System.out.println(results);
+ Assert.assertEquals(1, results.size());
+ Map columns = results.get(0).getColumns();
+ Assert.assertNotNull(columns.get("histogram"));
+ Assert.assertEquals("spectatorHistogramTimer", columns.get("histogram").getType());
+ }
+
+ @Test
+ public void testMetadataQueryDistribution() throws Exception
+ {
+ File segmentDir = tempFolder.newFolder();
+ helper.createIndex(
+ new File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogramDistribution\", \"name\": \"histogram\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ segmentDir,
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ true
+ );
+
+ ObjectMapper mapper = (ObjectMapper) TestHelper.makeJsonMapper();
+ SpectatorHistogramModule module = new SpectatorHistogramModule();
+ module.getJacksonModules().forEach(mod -> mapper.registerModule(mod));
+ IndexIO indexIO = new IndexIO(
+ mapper,
+ new ColumnConfig() { }
+ );
+
+ QueryableIndex index = indexIO.loadIndex(segmentDir);
+
+ SegmentId segmentId = SegmentId.dummy("segmentId");
+ QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
+ METADATA_QR_FACTORY,
+ segmentId,
+ new QueryableIndexSegment(index, segmentId),
+ null
+ );
+
+ SegmentMetadataQuery segmentMetadataQuery = Druids.newSegmentMetadataQueryBuilder()
+ .dataSource("test_datasource")
+ .intervals("2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z")
+ .merge(true)
+ .build();
+ List results = runner.run(QueryPlus.wrap(segmentMetadataQuery)).toList();
+ System.out.println(results);
+ Assert.assertEquals(1, results.size());
+ Map columns = results.get(0).getColumns();
+ Assert.assertNotNull(columns.get("histogram"));
+ Assert.assertEquals("spectatorHistogramDistribution", columns.get("histogram").getType());
+ }
+
+ @Test
+ public void testPercentilePostAggregator() throws Exception
+ {
+ Sequence seq = helper.createIndexAndRunQueryOnSegment(
+ new File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"histogram\", \"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0, // minTimestamp
+ Granularities.NONE,
+ 10, // maxRowCount
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"groupBy\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"dimenions\": [],",
+ " \"aggregations\": [",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"merged_cost_histogram\", \"fieldName\": "
+ + "\"histogram\"}",
+ " ],",
+ " \"postAggregations\": [",
+ " {\"type\": \"percentileSpectatorHistogram\", \"name\": \"percentileValue\", \"field\": {\"type\": \"fieldAccess\",\"fieldName\": \"merged_cost_histogram\"}"
+ + ", \"percentile\": \"50.0\"},",
+ " {\"type\": \"percentilesSpectatorHistogram\", \"name\": \"percentileValues\", \"field\": {\"type\": \"fieldAccess\",\"fieldName\": \"merged_cost_histogram\"}"
+ + ", \"percentiles\": [25.0, 50.0, 75.0, 99.0]}",
+ " ],",
+ " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+ "}"
+ )
+ );
+ SpectatorHistogram expected = new SpectatorHistogram();
+ expected.add(PercentileBuckets.indexOf(10), 1L);
+ expected.add(PercentileBuckets.indexOf(30), 1L);
+ expected.add(PercentileBuckets.indexOf(40), 3L);
+ expected.add(PercentileBuckets.indexOf(50), 3L);
+ expected.add(PercentileBuckets.indexOf(20000), 1L);
+
+ List results = seq.toList();
+ Assert.assertEquals(1, results.size());
+ // Check on Median (true median is 40)
+ Assert.assertEquals(40.0, (double) results.get(0).get(1), 0.2);
+ // True percentiles for 25, 50, 75, 99
+ double[] expectedPercentiles = new double[]{40.0, 40.0, 50.0, 18404.0};
+ double[] resultPercentiles = (double[]) results.get(0).get(2);
+
+ for (int i = 0; i < expectedPercentiles.length; i++) {
+ double expectedPercentile = expectedPercentiles[i];
+ double resultPercentile = resultPercentiles[i];
+ double error18pcnt = expectedPercentile * 0.18;
+ // Should be within 18%
+ Assert.assertEquals(expectedPercentile, resultPercentile, error18pcnt);
+ }
+ }
+
+ private static void assertResultsMatch(List results, int rowNum, String expectedProduct)
+ {
+ ResultRow row = results.get(rowNum);
+ Object product = row.get(0);
+ Assert.assertTrue("Expected dimension of type String", product instanceof String);
+ Assert.assertEquals("Product values didn't match", expectedProduct, product);
+ Object histogram = row.get(1);
+ Assert.assertTrue(
+ "Expected histogram metric of type SpectatorHistogramUtils.HistogramMap",
+ histogram instanceof SpectatorHistogram
+ );
+ Assert.assertEquals("Count values didn't match", EXPECTED_HISTOGRAMS.get(product), histogram);
+ }
+
+}
diff --git a/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramTest.java b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramTest.java
new file mode 100644
index 000000000000..fb15ac85e4c4
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramTest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram;
+
+import com.netflix.spectator.api.histogram.PercentileBuckets;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+public class SpectatorHistogramTest
+{
+ @Test
+ public void testToBytesSmallValues()
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.insert(10);
+ histogram.insert(30);
+ histogram.insert(40);
+ histogram.insert(40);
+ histogram.insert(40);
+ histogram.insert(50);
+ histogram.insert(50);
+ // Check the full range of bucket IDs still work
+ long bigValue = PercentileBuckets.get(270);
+ histogram.insert(bigValue);
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, histogram.size());
+ Assert.assertEquals("Should have sum matching number entries", 8, histogram.getSum());
+
+ byte[] bytes = histogram.toBytes();
+ int keySize = Short.BYTES;
+ int valSize = 0;
+ Assert.assertEquals("Should compact small values within key bytes", 5 * (keySize + valSize), bytes.length);
+
+ SpectatorHistogram deserialized = SpectatorHistogram.deserialize(bytes);
+ Assert.assertEquals(1L, deserialized.get(PercentileBuckets.indexOf(10)));
+ Assert.assertEquals(1L, deserialized.get(PercentileBuckets.indexOf(30)));
+ Assert.assertEquals(3L, deserialized.get(PercentileBuckets.indexOf(40)));
+ Assert.assertEquals(2L, deserialized.get(PercentileBuckets.indexOf(50)));
+ Assert.assertEquals(1L, deserialized.get(PercentileBuckets.indexOf(bigValue)));
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, deserialized.size());
+ Assert.assertEquals("Should have sum matching number entries", 8, deserialized.getSum());
+ }
+
+ @Test
+ public void testToBytesSmallishValues()
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(PercentileBuckets.indexOf(10), 64L);
+ histogram.add(PercentileBuckets.indexOf(30), 127L);
+ histogram.add(PercentileBuckets.indexOf(40), 111L);
+ histogram.add(PercentileBuckets.indexOf(50), 99L);
+ histogram.add(270, 100L);
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, histogram.size());
+ Assert.assertEquals("Should have sum matching number entries", 501, histogram.getSum());
+
+ byte[] bytes = histogram.toBytes();
+ int keySize = Short.BYTES;
+ int valSize = Byte.BYTES;
+ Assert.assertEquals("Should compact small values to a byte", 5 * (keySize + valSize), bytes.length);
+
+ SpectatorHistogram deserialized = SpectatorHistogram.deserialize(bytes);
+ Assert.assertEquals(64L, deserialized.get(PercentileBuckets.indexOf(10)));
+ Assert.assertEquals(127L, deserialized.get(PercentileBuckets.indexOf(30)));
+ Assert.assertEquals(111L, deserialized.get(PercentileBuckets.indexOf(40)));
+ Assert.assertEquals(99L, deserialized.get(PercentileBuckets.indexOf(50)));
+ Assert.assertEquals(100L, deserialized.get(270));
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, deserialized.size());
+ Assert.assertEquals("Should have sum matching number entries", 501, deserialized.getSum());
+ }
+
+ @Test
+ public void testToBytesMedValues()
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(PercentileBuckets.indexOf(10), 512L);
+ histogram.add(PercentileBuckets.indexOf(30), 1024L);
+ histogram.add(PercentileBuckets.indexOf(40), 2048L);
+ histogram.add(PercentileBuckets.indexOf(50), 4096L);
+ histogram.add(270, 8192L);
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, histogram.size());
+ Assert.assertEquals("Should have sum matching number entries", 15872, histogram.getSum());
+
+ byte[] bytes = histogram.toBytes();
+ int keySize = Short.BYTES;
+ int valSize = Short.BYTES;
+ Assert.assertEquals("Should compact medium values to short", 5 * (keySize + valSize), bytes.length);
+
+ SpectatorHistogram deserialized = SpectatorHistogram.deserialize(bytes);
+ Assert.assertEquals(512L, deserialized.get(PercentileBuckets.indexOf(10)));
+ Assert.assertEquals(1024L, deserialized.get(PercentileBuckets.indexOf(30)));
+ Assert.assertEquals(2048L, deserialized.get(PercentileBuckets.indexOf(40)));
+ Assert.assertEquals(4096L, deserialized.get(PercentileBuckets.indexOf(50)));
+ Assert.assertEquals(8192L, deserialized.get(270));
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, deserialized.size());
+ Assert.assertEquals("Should have sum matching number entries", 15872, deserialized.getSum());
+ }
+
+ @Test
+ public void testToBytesLargerValues()
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(PercentileBuckets.indexOf(10), 100000L);
+ histogram.add(PercentileBuckets.indexOf(30), 200000L);
+ histogram.add(PercentileBuckets.indexOf(40), 500000L);
+ histogram.add(PercentileBuckets.indexOf(50), 10000000L);
+ histogram.add(270, 50000000L);
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, histogram.size());
+ Assert.assertEquals("Should have sum matching number entries", 60800000, histogram.getSum());
+
+ byte[] bytes = histogram.toBytes();
+ int keySize = Short.BYTES;
+ int valSize = Integer.BYTES;
+ Assert.assertEquals("Should compact larger values to integer", 5 * (keySize + valSize), bytes.length);
+
+ SpectatorHistogram deserialized = SpectatorHistogram.deserialize(bytes);
+ Assert.assertEquals(100000L, deserialized.get(PercentileBuckets.indexOf(10)));
+ Assert.assertEquals(200000L, deserialized.get(PercentileBuckets.indexOf(30)));
+ Assert.assertEquals(500000L, deserialized.get(PercentileBuckets.indexOf(40)));
+ Assert.assertEquals(10000000L, deserialized.get(PercentileBuckets.indexOf(50)));
+ Assert.assertEquals(50000000L, deserialized.get(270));
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, deserialized.size());
+ Assert.assertEquals("Should have sum matching number entries", 60800000, deserialized.getSum());
+ }
+
+ @Test
+ public void testToBytesBiggestValues()
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(PercentileBuckets.indexOf(10), 10000000000L);
+ histogram.add(PercentileBuckets.indexOf(30), 20000000000L);
+ histogram.add(PercentileBuckets.indexOf(40), 50000000000L);
+ histogram.add(PercentileBuckets.indexOf(50), 100000000000L);
+ histogram.add(270, 5000000000000L);
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, histogram.size());
+ Assert.assertEquals("Should have sum matching number entries", 5180000000000L, histogram.getSum());
+
+ byte[] bytes = histogram.toBytes();
+ int keySize = Short.BYTES;
+ int valSize = Long.BYTES;
+ Assert.assertEquals("Should not compact larger values", 5 * (keySize + valSize), bytes.length);
+
+ SpectatorHistogram deserialized = SpectatorHistogram.deserialize(bytes);
+ Assert.assertEquals(10000000000L, deserialized.get(PercentileBuckets.indexOf(10)));
+ Assert.assertEquals(20000000000L, deserialized.get(PercentileBuckets.indexOf(30)));
+ Assert.assertEquals(50000000000L, deserialized.get(PercentileBuckets.indexOf(40)));
+ Assert.assertEquals(100000000000L, deserialized.get(PercentileBuckets.indexOf(50)));
+ Assert.assertEquals(5000000000000L, deserialized.get(270));
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, deserialized.size());
+ Assert.assertEquals("Should have sum matching number entries", 5180000000000L, deserialized.getSum());
+ }
+
+ @Test
+ public void testToBytesMixedValues()
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(PercentileBuckets.indexOf(10), 1L);
+ histogram.add(PercentileBuckets.indexOf(30), 300L);
+ histogram.add(PercentileBuckets.indexOf(40), 200000L);
+ histogram.add(PercentileBuckets.indexOf(50), 100000000000L);
+ histogram.add(270, 5000000000000L);
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, histogram.size());
+ Assert.assertEquals("Should have sum matching number entries", 5100000200301L, histogram.getSum());
+
+ byte[] bytes = histogram.toBytes();
+ int keySize = Short.BYTES;
+ Assert.assertEquals("Should not compact larger values", (5 * keySize) + 0 + 2 + 4 + 8 + 8, bytes.length);
+
+ SpectatorHistogram deserialized = SpectatorHistogram.deserialize(bytes);
+ Assert.assertEquals(1L, deserialized.get(PercentileBuckets.indexOf(10)));
+ Assert.assertEquals(300L, deserialized.get(PercentileBuckets.indexOf(30)));
+ Assert.assertEquals(200000L, deserialized.get(PercentileBuckets.indexOf(40)));
+ Assert.assertEquals(100000000000L, deserialized.get(PercentileBuckets.indexOf(50)));
+ Assert.assertEquals(5000000000000L, deserialized.get(270));
+
+ Assert.assertEquals("Should have size matching number of buckets", 5, deserialized.size());
+ Assert.assertEquals("Should have sum matching number entries", 5100000200301L, deserialized.getSum());
+ }
+
+ @Test
+ public void testToBytesBoundaryValues()
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(6, 63L);
+ histogram.add(7, 64L);
+ histogram.add(8, 255L);
+ histogram.add(9, 256L);
+ histogram.add(16, 65535L);
+ histogram.add(17, 65536L);
+ histogram.add(32, 4294967295L);
+ histogram.add(33, 4294967296L);
+
+ Assert.assertEquals("Should have size matching number of buckets", 8, histogram.size());
+ Assert.assertEquals("Should have sum matching number entries", 8590066300L, histogram.getSum());
+
+ byte[] bytes = histogram.toBytes();
+ int keySize = Short.BYTES;
+ Assert.assertEquals("Should compact", (8 * keySize) + 0 + 1 + 1 + 2 + 2 + 4 + 4 + 8, bytes.length);
+
+ SpectatorHistogram deserialized = SpectatorHistogram.deserialize(bytes);
+ Assert.assertEquals(63L, deserialized.get(6));
+ Assert.assertEquals(64L, deserialized.get(7));
+ Assert.assertEquals(255L, deserialized.get(8));
+ Assert.assertEquals(256L, deserialized.get(9));
+ Assert.assertEquals(65535L, deserialized.get(16));
+ Assert.assertEquals(65536L, deserialized.get(17));
+ Assert.assertEquals(4294967295L, deserialized.get(32));
+ Assert.assertEquals(4294967296L, deserialized.get(33));
+
+ Assert.assertEquals("Should have size matching number of buckets", 8, deserialized.size());
+ Assert.assertEquals("Should have sum matching number entries", 8590066300L, deserialized.getSum());
+ }
+
+ @Test(expected = IAE.class)
+ public void testBucketOutOfRangeMax() throws IAE
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(500, 1);
+ }
+
+ @Test(expected = IAE.class)
+ public void testBucketOutOfRangeNegative() throws IAE
+ {
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(-2, 1);
+ }
+
+ @Test
+ public void testSerializeAndDeserialize() throws IOException
+ {
+ SegmentWriteOutMedium medium = new OnHeapMemorySegmentWriteOutMedium();
+ SpectatorHistogramObjectStrategy strategy = new SpectatorHistogramObjectStrategy();
+ SpectatorHistogramSerializer serializer = SpectatorHistogramSerializer.create(medium, "test", strategy);
+ serializer.open();
+
+ SpectatorHistogram histogram = new SpectatorHistogram();
+ histogram.add(6, 63L);
+ histogram.add(7, 64L);
+ histogram.add(8, 255L);
+ histogram.add(9, 256L);
+ histogram.add(16, 65535L);
+ histogram.add(17, 65536L);
+ histogram.add(32, 4294967295L);
+ histogram.add(33, 4294967296L);
+
+ ColumnValueSelector selector = new ColumnValueSelector()
+ {
+ private int callCount = 0;
+
+ @Override
+ public boolean isNull()
+ {
+ return false;
+ }
+
+ @Override
+ public long getLong()
+ {
+ return 0;
+ }
+
+ @Override
+ public float getFloat()
+ {
+ return 0;
+ }
+
+ @Override
+ public double getDouble()
+ {
+ return 0;
+ }
+
+ @Override
+ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+ {
+
+ }
+
+ @Override
+ public SpectatorHistogram getObject()
+ {
+ // On every 3rd fetch and after 6, we'll return a null.
+ // Columns ending with a lot of nulls won't add to the
+ // size of the segment at all.
+ ++callCount;
+ if ((callCount % 3 == 0) || callCount > 6) {
+ return null;
+ }
+ return histogram;
+ }
+
+ @Override
+ public Class extends SpectatorHistogram> classOfObject()
+ {
+ return histogram.getClass();
+ }
+ };
+
+ int count = 0;
+ // Serialize lots of nulls at the end to ensure
+ // we don't waste space on nulls.
+ for (int i = 0; i < 125000; i++) {
+ serializer.serialize(selector);
+ count++;
+ }
+
+ long serializedSize = serializer.getSerializedSize();
+ // Column header = 6 bytes
+ // Offset header (Size + BitmapLength + ValueBitMap + Offsets)
+ // size = 4 bytes
+ // bitmap length = 4 bytes
+ // bitmap = 1 byte
+ // offsets * 4 = 16 bytes (no offset for nulls)
+ // Offset header = 25 bytes
+ // 4 values = 152 bytes
+ // each value = 38 bytes
+ // Total = 6 + 25 + 152 = 183
+ Assert.assertEquals("Expect serialized size", 183L, serializedSize);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final WritableByteChannel channel = Channels.newChannel(baos);
+ serializer.writeTo(channel, null);
+ channel.close();
+
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(baos.toByteArray());
+ Assert.assertEquals(serializer.getSerializedSize(), byteBuffer.remaining());
+ SpectatorHistogramIndexed indexedDeserialized = SpectatorHistogramIndexed.read(byteBuffer, strategy);
+ Assert.assertEquals(0, byteBuffer.remaining());
+
+ Assert.assertEquals("Count of entries should match", count, indexedDeserialized.size());
+
+ for (int i = 0; i < count; i++) {
+ SpectatorHistogram deserialized = indexedDeserialized.get(i);
+ if ((i + 1) % 3 == 0 || i >= 6) {
+ // Expect null
+ Assert.assertNull(deserialized);
+ } else {
+ Assert.assertEquals(63L, deserialized.get(6));
+ Assert.assertEquals(64L, deserialized.get(7));
+ Assert.assertEquals(255L, deserialized.get(8));
+ Assert.assertEquals(256L, deserialized.get(9));
+ Assert.assertEquals(65535L, deserialized.get(16));
+ Assert.assertEquals(65536L, deserialized.get(17));
+ Assert.assertEquals(4294967295L, deserialized.get(32));
+ Assert.assertEquals(4294967296L, deserialized.get(33));
+ }
+ }
+ }
+
+ @Test
+ public void testPercentileComputation0()
+ {
+ SpectatorHistogram h = new SpectatorHistogram();
+ h.insert(0);
+ Assert.assertEquals(0.1, h.getPercentileValue(10.0), 0.01);
+ Assert.assertEquals(0.5, h.getPercentileValue(50.0), 0.01);
+ Assert.assertEquals(0.99, h.getPercentileValue(99.0), 0.01);
+ Assert.assertEquals(1.0, h.getPercentileValue(100.0), 0.01);
+ }
+
+ @Test
+ public void testPercentileComputation1_100()
+ {
+ SpectatorHistogram h = new SpectatorHistogram();
+ for (int i = 0; i < 100; i++) {
+ h.insert(i);
+ }
+ // Precision assigned to half of the bucket width
+ Assert.assertEquals(10.0, h.getPercentileValue(10.0), 0.5);
+ Assert.assertEquals(50.0, h.getPercentileValue(50.0), 2.5);
+ Assert.assertEquals(99.0, h.getPercentileValue(99.0), 10.5);
+ Assert.assertEquals(100.0, h.getPercentileValue(100.0), 10.5);
+ }
+
+ @Test
+ public void testPercentileComputation0_Big()
+ {
+ SpectatorHistogram h = new SpectatorHistogram();
+ // one very small value, 99 very big values
+ h.add(0, 1);
+ h.add(200, 99);
+ long upperBoundOfBucket0 = PercentileBuckets.get(0);
+ long upperBoundOfBucket200 = PercentileBuckets.get(200);
+ long lowerBoundOfBucket200 = PercentileBuckets.get(199);
+ long widthOfBucket = upperBoundOfBucket200 - lowerBoundOfBucket200;
+ // P1 should be pulled towards the very low value
+ // P >1 should be pulled towards the very big value
+ Assert.assertEquals(upperBoundOfBucket0, h.getPercentileValue(1.0), 0.01);
+ Assert.assertEquals(lowerBoundOfBucket200, h.getPercentileValue(50.0), widthOfBucket / 2.0);
+ Assert.assertEquals(upperBoundOfBucket200, h.getPercentileValue(99.0), widthOfBucket / 2.0);
+ Assert.assertEquals(upperBoundOfBucket200, h.getPercentileValue(100.0), widthOfBucket / 2.0);
+ }
+
+ @Test
+ public void testMedianOfSequence()
+ {
+ int[] nums = new int[]{9, 10, 12, 13, 13, 13, 15, 15, 16, 16, 18, 22, 23, 24, 24, 25};
+ SpectatorHistogram h = new SpectatorHistogram();
+
+ for (int num : nums) {
+ h.insert(num);
+ }
+
+ // Expect middle of the "15.5" bucket, which is 18.0
+ int index = PercentileBuckets.indexOf(15);
+ long upperBoundOfFifteenPointFiveBucket = PercentileBuckets.get(index);
+ long lowerBoundOfFifteenPointFiveBucket = PercentileBuckets.get(index - 1);
+ long halfBucketWidth = ((upperBoundOfFifteenPointFiveBucket - lowerBoundOfFifteenPointFiveBucket) / 2);
+ long middleOfFifteenPointFiveBucket = lowerBoundOfFifteenPointFiveBucket + halfBucketWidth;
+
+ Assert.assertEquals(middleOfFifteenPointFiveBucket, h.getPercentileValue(50.0), 0.01);
+ }
+}
diff --git a/extensions-contrib/spectator-histogram/src/test/resources/input_data.tsv b/extensions-contrib/spectator-histogram/src/test/resources/input_data.tsv
new file mode 100644
index 000000000000..9938f51e26b4
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/test/resources/input_data.tsv
@@ -0,0 +1,12 @@
+2016010101 A 10
+2016010101 B 30
+2016010101 B 40
+2016010101 B 40
+2016010101 B 40
+2016010101 B 50
+2016010101 B 50
+2016010101 C 50
+2016010101 C 20000
+2016010101 D
+2016010101 E
+2016010101 F
\ No newline at end of file
diff --git a/extensions-contrib/spectator-histogram/src/test/resources/pre_agg_data.tsv b/extensions-contrib/spectator-histogram/src/test/resources/pre_agg_data.tsv
new file mode 100644
index 000000000000..6a16d6b1c591
--- /dev/null
+++ b/extensions-contrib/spectator-histogram/src/test/resources/pre_agg_data.tsv
@@ -0,0 +1,6 @@
+2016010101 A {"10":1}
+2016010101 B {"17":1, "19":3, "21":2}
+2016010101 C {"60":1, "21":1}
+2016010101 D {}
+2016010101 E {}
+2016010101 F {}
diff --git a/pom.xml b/pom.xml
index 81cb00bb0cf5..6149c5866db4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -229,6 +229,7 @@
extensions-contrib/opentelemetry-emitter
extensions-contrib/kubernetes-overlord-extensions
extensions-contrib/druid-iceberg-extensions
+ extensions-contrib/spectator-histogram
distribution
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
index 4f82bdcfe69d..93cf75857c30 100755
--- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
@@ -129,6 +129,11 @@ public class AggregatorUtil
// TDigest sketch aggregators
public static final byte TDIGEST_BUILD_SKETCH_CACHE_TYPE_ID = 0x38;
+ // Spectator histogram aggregators
+ public static final byte SPECTATOR_HISTOGRAM_CACHE_TYPE_ID = 0x39;
+ public static final byte SPECTATOR_HISTOGRAM_DISTRIBUTION_CACHE_TYPE_ID = 0x3A;
+ public static final byte SPECTATOR_HISTOGRAM_TIMER_CACHE_TYPE_ID = 0x3B;
+
public static final byte MEAN_CACHE_TYPE_ID = 0x41;
// ANY aggregator
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java
index f65208bd9069..ed4bbfdc82b5 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java
@@ -66,4 +66,6 @@ public class PostAggregatorIds
public static final byte KLL_FLOATS_SKETCH_TO_QUANTILE_CACHE_TYPE_ID = 42;
public static final byte KLL_FLOATS_SKETCH_TO_QUANTILES_CACHE_TYPE_ID = 43;
public static final byte KLL_FLOATS_SKETCH_TO_STRING_CACHE_TYPE_ID = 44;
+ public static final byte SPECTATOR_HISTOGRAM_SKETCH_PERCENTILE_CACHE_TYPE_ID = 45;
+ public static final byte SPECTATOR_HISTOGRAM_SKETCH_PERCENTILES_CACHE_TYPE_ID = 46;
}
diff --git a/website/.spelling b/website/.spelling
index 7561bcec965d..175774e4ac24 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -430,6 +430,7 @@ pluggable
podSpec
postgres
postgresql
+pre-aggregate
pre-aggregated
pre-aggregates
pre-aggregating
@@ -948,6 +949,7 @@ prometheus
Pushgateway
flushPeriod
postAggregator
+postAggregators
quantileFromTDigestSketch
quantilesFromTDigestSketch
tDigestSketch
@@ -2373,3 +2375,12 @@ markUnused
markUsed
segmentId
aggregateMultipleValues
+
+- ../docs/development/extensions-contrib/spectator-histogram.md
+SpectatorHistogram
+PercentileBuckets
+spectatorHistogram
+spectatorHistogramTimer
+spectatorHistogramDistribution
+percentileSpectatorHistogram
+percentilesSpectatorHistogram