Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions extensions-contrib/spectator-histogram/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@
<artifactId>error_prone_annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-sql</artifactId>
Expand Down Expand Up @@ -137,5 +142,10 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,12 @@

package org.apache.druid.spectator.histogram;

import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ComplexColumn;
import org.apache.druid.segment.data.ReadableOffset;

import javax.annotation.Nullable;

public class SpectatorHistogramIndexBasedComplexColumn implements ComplexColumn
{
private final SpectatorHistogramIndexed index;
private final String typeName;
private static final Number ZERO = 0;

public SpectatorHistogramIndexBasedComplexColumn(String typeName, SpectatorHistogramIndexed index)
{
Expand Down Expand Up @@ -59,72 +53,11 @@ public Object getRowValue(int rowNum)
@Override
public int getLength()
{
return index.size();
return -1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to claim a length of -1 here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is specced as returning the serialized size of the column in bytes, or -1 if unknown. index.size() returns a row count, which doesn't match the specced behavior. The SpectatorHistogramIndexed doesn't seem to know its own serialized size, and the getLength() method doesn't seem to be used anywhere important, so I figured changing this to -1 was a good idea.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not important then -1 seems fine.
We can very likely compute (or estimate an upper bound) of the size of the column if it will optimize something elsewhere.

}

@Override
public void close()
{
}

@Override
public ColumnValueSelector<SpectatorHistogram> makeColumnValueSelector(ReadableOffset offset)
{
// Use ColumnValueSelector directly so that we support being queried as a Number using
// longSum or doubleSum aggregators, the NullableNumericBufferAggregator will call isNull.
// This allows us to behave as a Number or SpectatorHistogram object.
// When queried as a Number, we're returning the count of entries in the histogram.
// As such, we can safely return 0 where the histogram is null.
return new ColumnValueSelector<SpectatorHistogram>()
{
@Override
public boolean isNull()
{
return getObject() == null;
}

private Number getOrZero()
{
SpectatorHistogram histogram = getObject();
return histogram != null ? histogram : ZERO;
}

@Override
public long getLong()
{
return getOrZero().longValue();
}

@Override
public float getFloat()
{
return getOrZero().floatValue();
}

@Override
public double getDouble()
{
return getOrZero().doubleValue();
}

@Nullable
@Override
public SpectatorHistogram getObject()
{
return (SpectatorHistogram) getRowValue(offset.getOffset());
}

@Override
public Class classOfObject()
{
return getClazz();
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("column", SpectatorHistogramIndexBasedComplexColumn.this);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@
package org.apache.druid.spectator.histogram;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.netflix.spectator.api.histogram.PercentileBuckets;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.NoopInputRowParser;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Druids;
Expand All @@ -32,6 +38,9 @@
import org.apache.druid.query.aggregation.AggregationTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.ResultRow;
Expand All @@ -42,13 +51,17 @@
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -59,6 +72,7 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -716,6 +730,59 @@ public void testPercentilePostAggregator() throws Exception
}
}

@Test
public void testBuildingAndCountingHistogramsIncrementalIndex() throws Exception
{
List<String> dimensions = Collections.singletonList("d");
int n = 10;
DateTime startOfDay = DateTimes.of("2000-01-01");
List<InputRow> inputRows = new ArrayList<>(n);
for (int i = 1; i <= n; i++) {
String val = String.valueOf(i * 1.0d);

inputRows.add(new MapBasedInputRow(
startOfDay.plusMinutes(i),
dimensions,
ImmutableMap.of("x", i, "d", val)
));
}

IncrementalIndex index = AggregationTestHelper.createIncrementalIndex(
inputRows.iterator(),
new NoopInputRowParser(null),
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new SpectatorHistogramAggregatorFactory("histogram", "x")
},
0,
Granularities.NONE,
100,
false
);

ImmutableList<Segment> segments = ImmutableList.of(
new IncrementalIndexSegment(index, SegmentId.dummy("test")),
helper.persistIncrementalIndex(index, null)
);

GroupByQuery query = new GroupByQuery.Builder()
.setDataSource("test")
.setGranularity(Granularities.HOUR)
.setInterval("1970/2050")
.setAggregatorSpecs(
new DoubleSumAggregatorFactory("doubleSum", "histogram")
).build();

Sequence<ResultRow> seq = helper.runQueryOnSegmentsObjs(segments, query);

List<ResultRow> results = seq.toList();
Assert.assertEquals(1, results.size());
// Check timestamp
Assert.assertEquals(startOfDay.getMillis(), results.get(0).get(0));
// Check doubleSum
Assert.assertEquals(n * segments.size(), (Double) results.get(0).get(1), 0.001);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be just n regardless of how many segments there are. This should be the count of original input values, however they're spread across segments.
There happens to be a single segment here, so it doesn't affect the test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the * segments.size() is required for this test to pass (it's 2 here). This was the new test; I think what's happening in this test is the same data is added twice:

    ImmutableList<Segment> segments = ImmutableList.of(
        new IncrementalIndexSegment(index, SegmentId.dummy("test")),
        helper.persistIncrementalIndex(index, null)
    );

Since index has a full copy of the dataset, the query on segments see a double-count.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, that's my bad then. Do we need both:

new IncrementalIndexSegment(index, SegmentId.dummy("test")), helper.persistIncrementalIndex(index, null)

for this test to be meaningful?
I think I'd intended to have just 1 incremental segment. Likely a copy/paste issue from another test.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to have both the in-memory and the persisted indexes in the test. This way we can test when the query hit both type of indexes.

}

private static void assertResultsMatch(List<ResultRow> results, int rowNum, String expectedProduct)
{
ResultRow row = results.get(rowNum);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.spectator.histogram;

import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;

public class SpectatorHistogramIndexBasedComplexColumnTest
{
@Test
public void testComplexColumn()
{
final SpectatorHistogramIndexed mockIndexed = EasyMock.createMock(SpectatorHistogramIndexed.class);
EasyMock.replay(mockIndexed);

final String typeName = "type";
final SpectatorHistogramIndexBasedComplexColumn column =
new SpectatorHistogramIndexBasedComplexColumn("type", mockIndexed);
Assert.assertEquals(typeName, column.getTypeName());
Assert.assertEquals(-1, column.getLength());

EasyMock.verify(mockIndexed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@
import org.apache.druid.math.expr.Expr;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DoubleColumnSelector;
import org.apache.druid.segment.FloatColumnSelector;
import org.apache.druid.segment.LongColumnSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.segment.virtual.ExpressionSelectors;
Expand Down Expand Up @@ -428,4 +432,26 @@ public static Supplier<byte[]> getSimpleAggregatorCacheKeySupplier(
.array();
});
}

/**
* Whether a simple numeric aggregator should use {@link BaseObjectColumnValueSelector#getObject()}, and coerce the
* result to number, rather than using a primitive method like {@link BaseLongColumnValueSelector#getLong()}.
*
* @param fieldName field name, or null if the aggregator is expression-based
* @param columnSelectorFactory column selector factory
*/
public static boolean shouldUseObjectColumnAggregatorWrapper(
@Nullable final String fieldName,
final ColumnSelectorFactory columnSelectorFactory
)
{
if (fieldName != null) {
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(fieldName);

// STRING can be coerced to a number. COMPLEX types can be subclasses of Number (or subclasses of some type
// that is coercible to a number.)
return Types.is(capabilities, ValueType.STRING) || Types.is(capabilities, ValueType.COMPLEX);
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/**
* An Aggregator that delegates everything. It is used by Aggregator wrappers e.g.
* {@link StringColumnDoubleAggregatorWrapper} that modify some behavior of a delegate.
* {@link ObjectColumnDoubleAggregatorWrapper} that modify some behavior of a delegate.
*/
public abstract class DelegatingAggregator implements Aggregator
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* A BufferAggregator that delegates everything. It is used by BufferAggregator wrappers e.g.
* {@link StringColumnDoubleBufferAggregatorWrapper} that modify some behavior of a delegate.
* {@link ObjectColumnDoubleBufferAggregatorWrapper} that modify some behavior of a delegate.
*/
public abstract class DelegatingBufferAggregator implements BufferAggregator
{
Expand Down
Loading