From fe34b51e384b6997b6cb711572cfba2f867823e4 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 6 Nov 2019 10:04:27 -0800 Subject: [PATCH] Fixes, adjustments to numeric null handling and string first/last aggregators. There is a class of bugs due to the fact that BaseObjectColumnValueSelector has both "getObject" and "isNull" methods, but in most selector implementations and most call sites, it is clear that the intent of "isNull" is only to apply to the primitive getters, not the object getter. This makes sense, because the purpose of isNull is to enable detection of nulls in otherwise-primitive columns. Imagine a string column with a numeric selector built on top of it. You would want it to return isNull = true, so numeric aggregators don't treat it as all zeroes. Sometimes this design leads people to accidentally guard non-primitive get methods with "selector.isNull" checks, which is improper. This patch has three goals: 1) Fix null-handling bugs that already exist in this class. 2) Make interface and doc changes that reduce the probability of future bugs. 3) Fix other, unrelated bugs I noticed in the stringFirst and stringLast aggregators while fixing null-handling bugs. I thought about splitting this into its own patch, but it ended up being tough to split from the null-handling fixes. For (1) the fixes are, - Fix StringFirst and StringLastAggregatorFactory to stop guarding getObject calls on isNull, by no longer extending NullableAggregatorFactory. Now uses -1 as a sigil value for null, to differentiate nulls and empty strings. - Fix ExpressionFilter to stop guarding getObject calls on isNull. Also, use eval.asBoolean() to avoid calling getLong on the selector after already calling getObject. - Fix ObjectBloomFilterAggregator to stop guarding DimensionSelector calls on isNull. Also, refactored slightly to avoid the overhead of calling getObject followed by another getter (see BloomFilterAggregatorFactory for part of this). For (2) the main changes are, - Remove the "isNull" method from BaseObjectColumnValueSelector. - Clarify "isNull" doc on BaseNullableColumnValueSelector. - Rename NullableAggregatorFactory -> NullbleNumericAggregatorFactory to emphasize that it only works on aggregators that take numbers as input. - Similar naming changes to the Aggregator, BufferAggregator, and AggregateCombiner. - Similar naming changes to helper methods for groupBy, ValueMatchers, etc. For (3) the other fixes for StringFirst and StringLastAggregatorFactory are, - Fixed buffer overrun in the buffer aggregators when some characters in the string code into more than one byte (the old code used "substring" to apply a byte limit, which is bad). I did this by introducing a new StringUtils.toUtf8WithLimit method. - Fixed weird IncrementalIndex logic that led to reading nulls for the timestamp. - Adjusted weird StringFirst/Last logic that worked around the weird IncrementalIndex behavior. - Refactored to share code between the four aggregators. - Improved test coverage. - Made the base stringFirst, stringLast aggregators adaptive, and streamlined the xFold versions into aliases. The adaptiveness is similar to how other aggregators like hyperUnique work. --- .../druid/java/util/common/StringUtils.java | 65 ++++++++-- .../org/apache/druid/math/expr/Evals.java | 20 +-- .../org/apache/druid/math/expr/ExprEval.java | 1 + .../java/util/common/StringUtilsTest.java | 28 ++++- .../bloom/BaseBloomFilterAggregator.java | 7 +- .../bloom/BloomFilterAggregatorFactory.java | 18 ++- .../bloom/BloomFilterMergeAggregator.java | 7 +- .../bloom/ObjectBloomFilterAggregator.java | 26 ++-- .../druid/jackson/AggregatorsModule.java | 2 - .../query/aggregation/AggregatorFactory.java | 8 +- ... => NullableNumericAggregateCombiner.java} | 16 +-- ...or.java => NullableNumericAggregator.java} | 24 ++-- ... => NullableNumericAggregatorFactory.java} | 25 ++-- ...a => NullableNumericBufferAggregator.java} | 14 ++- ...a => NullableNumericVectorAggregator.java} | 12 +- .../SimpleDoubleAggregatorFactory.java | 2 +- .../SimpleFloatAggregatorFactory.java | 2 +- .../SimpleLongAggregatorFactory.java | 2 +- .../first/DoubleFirstAggregatorFactory.java | 4 +- .../first/FloatFirstAggregatorFactory.java | 4 +- .../first/LongFirstAggregatorFactory.java | 4 +- .../first/StringFirstAggregator.java | 43 +++---- .../first/StringFirstAggregatorFactory.java | 38 +++--- .../first/StringFirstBufferAggregator.java | 83 ++++--------- .../StringFirstFoldingAggregatorFactory.java | 81 +------------ .../first/StringFirstLastUtils.java | 114 ++++++++++++++++++ .../last/DoubleLastAggregatorFactory.java | 4 +- .../last/FloatLastAggregatorFactory.java | 4 +- .../last/LongLastAggregatorFactory.java | 4 +- .../last/StringLastAggregator.java | 50 ++++---- .../last/StringLastAggregatorFactory.java | 40 +++--- .../last/StringLastBufferAggregator.java | 84 ++++--------- .../StringLastFoldingAggregatorFactory.java | 78 +----------- ...bleValueMatcherColumnSelectorStrategy.java | 2 +- ...oatValueMatcherColumnSelectorStrategy.java | 2 +- ...ongValueMatcherColumnSelectorStrategy.java | 2 +- .../druid/query/filter/ValueMatcher.java | 7 +- .../epinephelinae/GroupByQueryEngineV2.java | 12 +- ...NumericGroupByColumnSelectorStrategy.java} | 10 +- .../BaseNullableColumnValueSelector.java | 13 +- .../BaseObjectColumnValueSelector.java | 2 +- .../segment/filter/ExpressionFilter.java | 32 +++-- .../segment/incremental/IncrementalIndex.java | 33 +++-- .../segment/virtual/ExpressionSelectors.java | 17 ++- .../first/StringFirstAggregationTest.java | 22 ++-- .../StringFirstBufferAggregatorTest.java | 30 ++--- .../first/StringFirstTimeseriesQueryTest.java | 103 ++++++++++------ .../last/StringLastBufferAggregatorTest.java | 28 ++--- .../last/StringLastTimeseriesQueryTest.java | 106 +++++++++------- .../GroupByQueryQueryToolChestTest.java | 8 +- .../topn/TopNQueryQueryToolChestTest.java | 15 ++- .../segment/filter/ExpressionFilterTest.java | 3 + 52 files changed, 697 insertions(+), 664 deletions(-) rename processing/src/main/java/org/apache/druid/query/aggregation/{NullableAggregateCombiner.java => NullableNumericAggregateCombiner.java} (77%) rename processing/src/main/java/org/apache/druid/query/aggregation/{NullableAggregator.java => NullableNumericAggregator.java} (67%) rename processing/src/main/java/org/apache/druid/query/aggregation/{NullableAggregatorFactory.java => NullableNumericAggregatorFactory.java} (81%) rename processing/src/main/java/org/apache/druid/query/aggregation/{NullableBufferAggregator.java => NullableNumericBufferAggregator.java} (86%) rename processing/src/main/java/org/apache/druid/query/aggregation/{NullableVectorAggregator.java => NullableNumericVectorAggregator.java} (87%) create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java rename processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/{NullableValueGroupByColumnSelectorStrategy.java => NullableNumericGroupByColumnSelectorStrategy.java} (88%) diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java index b4520070fe9f..4880c0f6cee2 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -26,7 +26,10 @@ import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.ByteBuffer; +import java.nio.CharBuffer; import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CodingErrorAction; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Base64; @@ -107,6 +110,44 @@ public static byte[] toUtf8(final String string) } } + /** + * Encodes "string" into the buffer "byteBuffer", using no more than the number of bytes remaining in the buffer. + * Will only encode whole characters. The byteBuffer's position and limit be changed during operation, but will + * be reset before this method call ends. + * + * @return the number of bytes written, which may be shorter than the full encoded string length if there + * is not enough room in the output buffer. + */ + public static int toUtf8WithLimit(final String string, final ByteBuffer byteBuffer) + { + final CharsetEncoder encoder = StandardCharsets.UTF_8 + .newEncoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); + + final int originalPosition = byteBuffer.position(); + final int originalLimit = byteBuffer.limit(); + final int maxBytes = byteBuffer.remaining(); + + try { + final char[] chars = string.toCharArray(); + final CharBuffer charBuffer = CharBuffer.wrap(chars); + + // No reason to look at the CoderResult from the "encode" call; we can tell the number of transferred characters + // by looking at the output buffer's position. + encoder.encode(charBuffer, byteBuffer, true); + + final int bytesWritten = byteBuffer.position() - originalPosition; + + assert bytesWritten <= maxBytes; + return bytesWritten; + } + finally { + byteBuffer.position(originalPosition); + byteBuffer.limit(originalLimit); + } + } + @Nullable public static byte[] toUtf8Nullable(@Nullable final String string) { @@ -163,6 +204,7 @@ public static String toUpperCase(String s) * application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form data as well. * * @param s String to be encoded + * * @return application/x-www-form-urlencoded format encoded String, but with "+" replaced with "%20". */ @Nullable @@ -311,6 +353,7 @@ public static String emptyToNullNonDruidDataString(@Nullable String string) * Convert an input to base 64 and return the utf8 string of that byte array * * @param input The string to convert to base64 + * * @return the base64 of the input in string form */ public static String utf8Base64(String input) @@ -322,6 +365,7 @@ public static String utf8Base64(String input) * Convert an input byte array into a newly-allocated byte array using the {@link Base64} encoding scheme * * @param input The byte array to convert to base64 + * * @return the base64 of the input in byte array form */ public static byte[] encodeBase64(byte[] input) @@ -333,6 +377,7 @@ public static byte[] encodeBase64(byte[] input) * Convert an input byte array into a string using the {@link Base64} encoding scheme * * @param input The byte array to convert to base64 + * * @return the base64 of the input in string form */ public static String encodeBase64String(byte[] input) @@ -344,6 +389,7 @@ public static String encodeBase64String(byte[] input) * Decode an input byte array using the {@link Base64} encoding scheme and return a newly-allocated byte array * * @param input The byte array to decode from base64 + * * @return a newly-allocated byte array */ public static byte[] decodeBase64(byte[] input) @@ -355,6 +401,7 @@ public static byte[] decodeBase64(byte[] input) * Decode an input string using the {@link Base64} encoding scheme and return a newly-allocated byte array * * @param input The string to decode from base64 + * * @return a newly-allocated byte array */ public static byte[] decodeBase64String(String input) @@ -411,7 +458,7 @@ public static String repeat(String s, int count) System.arraycopy(multiple, 0, multiple, copied, limit - copied); return new String(multiple, StandardCharsets.UTF_8); } - + /** * Returns the string left-padded with the string pad to a length of len characters. * If str is longer than len, the return value is shortened to len characters. @@ -419,8 +466,9 @@ public static String repeat(String s, int count) * https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala * * @param base The base string to be padded - * @param len The length of padded string - * @param pad The pad string + * @param len The length of padded string + * @param pad The pad string + * * @return the string left-padded with pad to a length of len */ public static String lpad(String base, Integer len, String pad) @@ -453,11 +501,12 @@ public static String lpad(String base, Integer len, String pad) /** * Returns the string right-padded with the string pad to a length of len characters. - * If str is longer than len, the return value is shortened to len characters. + * If str is longer than len, the return value is shortened to len characters. * * @param base The base string to be padded - * @param len The length of padded string - * @param pad The pad string + * @param len The length of padded string + * @param pad The pad string + * * @return the string right-padded with pad to a length of len */ public static String rpad(String base, Integer len, String pad) @@ -473,12 +522,12 @@ public static String rpad(String base, Integer len, String pad) int pos = 0; // Copy the base - for ( ; pos < base.length() && pos < len; pos++) { + for (; pos < base.length() && pos < len; pos++) { data[pos] = base.charAt(pos); } // Copy the padding - for ( ; pos < len; pos += pad.length()) { + for (; pos < len; pos += pad.length()) { for (int i = 0; i < pad.length() && i < len - pos; i++) { data[pos + i] = pad.charAt(i); } diff --git a/core/src/main/java/org/apache/druid/math/expr/Evals.java b/core/src/main/java/org/apache/druid/math/expr/Evals.java index 88cc56a3dc8c..f6e3e4f74c85 100644 --- a/core/src/main/java/org/apache/druid/math/expr/Evals.java +++ b/core/src/main/java/org/apache/druid/math/expr/Evals.java @@ -22,6 +22,7 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.logger.Logger; +import javax.annotation.Nullable; import java.util.Arrays; import java.util.List; @@ -46,21 +47,6 @@ public static boolean isAllConstants(List exprs) return true; } - // for binary operator not providing constructor of form (String, Expr, Expr), - // you should create it explicitly in here - public static Expr binaryOp(BinaryOpExprBase binary, Expr left, Expr right) - { - try { - return binary.getClass() - .getDeclaredConstructor(String.class, Expr.class, Expr.class) - .newInstance(binary.op, left, right); - } - catch (Exception e) { - log.warn(e, "failed to rewrite expression " + binary); - return binary; // best effort.. keep it working - } - } - public static long asLong(boolean x) { return x ? 1L : 0L; @@ -81,8 +67,8 @@ public static boolean asBoolean(double x) return x > 0; } - public static boolean asBoolean(String x) + public static boolean asBoolean(@Nullable String x) { - return !NullHandling.isNullOrEquivalent(x) && Boolean.valueOf(x); + return !NullHandling.isNullOrEquivalent(x) && Boolean.parseBoolean(x); } } diff --git a/core/src/main/java/org/apache/druid/math/expr/ExprEval.java b/core/src/main/java/org/apache/druid/math/expr/ExprEval.java index de77eb1d99ef..b6f1f58736a8 100644 --- a/core/src/main/java/org/apache/druid/math/expr/ExprEval.java +++ b/core/src/main/java/org/apache/druid/math/expr/ExprEval.java @@ -381,6 +381,7 @@ private static class StringExprEval extends ExprEval private static final StringExprEval OF_NULL = new StringExprEval(null); + @Nullable private Number numericVal; private StringExprEval(@Nullable String value) diff --git a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java index 8a1748cc249c..e9f5f214b08b 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java @@ -56,6 +56,32 @@ public void toUtf8ConversionTest() } } + @Test + public void toUtf8WithLimitTest() + { + final ByteBuffer smallBuffer = ByteBuffer.allocate(4); + final ByteBuffer mediumBuffer = ByteBuffer.allocate(6); + final ByteBuffer bigBuffer = ByteBuffer.allocate(8); + + final int smallBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", smallBuffer); + Assert.assertEquals(4, smallBufferResult); + final byte[] smallBufferByteArray = new byte[smallBufferResult]; + smallBuffer.get(smallBufferByteArray); + Assert.assertEquals("🚀", StringUtils.fromUtf8(smallBufferByteArray)); + + final int mediumBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", mediumBuffer); + Assert.assertEquals(4, mediumBufferResult); + final byte[] mediumBufferByteArray = new byte[mediumBufferResult]; + mediumBuffer.get(mediumBufferByteArray); + Assert.assertEquals("🚀", StringUtils.fromUtf8(mediumBufferByteArray)); + + final int bigBufferResult = StringUtils.toUtf8WithLimit("🚀🌔", bigBuffer); + Assert.assertEquals(8, bigBufferResult); + final byte[] bigBufferByteArray = new byte[bigBufferResult]; + bigBuffer.get(bigBufferByteArray); + Assert.assertEquals("🚀🌔", StringUtils.fromUtf8(bigBufferByteArray)); + } + @Test public void fromUtf8ByteBufferHeap() { @@ -181,7 +207,7 @@ public void testRepeat() expectedException.expectMessage("count is negative, -1"); Assert.assertEquals("", StringUtils.repeat("foo", -1)); } - + @Test public void testLpad() { diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java index 48ba08327537..8bc4bd323adc 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterAggregator.java @@ -23,7 +23,6 @@ import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseNullableColumnValueSelector; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -44,10 +43,10 @@ * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize} and * {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered} * - * @param type of {@link BaseNullableColumnValueSelector} that feeds this aggregator, likely either values - * to add to a bloom filter, or other bloom filters to merge into this bloom filter. + * @param type of selector that feeds this aggregator, likely either values to add to a bloom filter, + * or other bloom filters to merge into this bloom filter. */ -public abstract class BaseBloomFilterAggregator +public abstract class BaseBloomFilterAggregator implements BufferAggregator, Aggregator { @Nullable diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java index d158be45a207..28fcaffbbf5e 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterAggregatorFactory.java @@ -35,6 +35,7 @@ import org.apache.druid.query.filter.BloomKFilter; import org.apache.druid.segment.BaseNullableColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.NilColumnValueSelector; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ValueType; @@ -279,16 +280,21 @@ private BaseBloomFilterAggregator factorizeInternal(ColumnSelectorFactory column ); } } else { + // No column capabilities, try to guess based on selector type. BaseNullableColumnValueSelector selector = columnFactory.makeColumnValueSelector(field.getDimension()); + if (selector instanceof NilColumnValueSelector) { return new NoopBloomFilterAggregator(maxNumEntries, onHeap); + } else if (selector instanceof DimensionSelector) { + return new StringBloomFilterAggregator((DimensionSelector) selector, maxNumEntries, onHeap); + } else { + // Use fallback 'object' aggregator. + return new ObjectBloomFilterAggregator( + columnFactory.makeColumnValueSelector(field.getDimension()), + maxNumEntries, + onHeap + ); } - // no column capabilities, use fallback 'object' aggregator - return new ObjectBloomFilterAggregator( - columnFactory.makeColumnValueSelector(field.getDimension()), - maxNumEntries, - onHeap - ); } } } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java index 6855d83a35ef..7dd8abaea0b5 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BloomFilterMergeAggregator.java @@ -21,13 +21,14 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import java.nio.ByteBuffer; -public final class BloomFilterMergeAggregator extends BaseBloomFilterAggregator> +public final class BloomFilterMergeAggregator + extends BaseBloomFilterAggregator> { - BloomFilterMergeAggregator(ColumnValueSelector selector, int maxNumEntries, boolean onHeap) + BloomFilterMergeAggregator(BaseObjectColumnValueSelector selector, int maxNumEntries, boolean onHeap) { super(selector, maxNumEntries, onHeap); } diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java index bc97fab800ac..0ad7a179fdb6 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/ObjectBloomFilterAggregator.java @@ -19,20 +19,18 @@ package org.apache.druid.query.aggregation.bloom; -import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.filter.BloomKFilter; -import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import java.nio.ByteBuffer; /** * Handles "unknown" columns by examining what comes out of the selector */ -class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator +class ObjectBloomFilterAggregator extends BaseBloomFilterAggregator> { ObjectBloomFilterAggregator( - ColumnValueSelector selector, + BaseObjectColumnValueSelector selector, int maxNumEntries, boolean onHeap ) @@ -48,16 +46,14 @@ void bufferAdd(ByteBuffer buf) final ByteBuffer other = (ByteBuffer) object; BloomKFilter.mergeBloomFilterByteBuffers(buf, buf.position(), other, other.position()); } else { - if (NullHandling.replaceWithDefault() || !selector.isNull()) { - if (object instanceof Long) { - BloomKFilter.addLong(buf, selector.getLong()); - } else if (object instanceof Double) { - BloomKFilter.addDouble(buf, selector.getDouble()); - } else if (object instanceof Float) { - BloomKFilter.addFloat(buf, selector.getFloat()); - } else { - StringBloomFilterAggregator.stringBufferAdd(buf, (DimensionSelector) selector); - } + if (object instanceof Long) { + BloomKFilter.addLong(buf, (long) object); + } else if (object instanceof Double) { + BloomKFilter.addDouble(buf, (double) object); + } else if (object instanceof Float) { + BloomKFilter.addFloat(buf, (float) object); + } else if (object instanceof String) { + BloomKFilter.addString(buf, (String) object); } else { BloomKFilter.addBytes(buf, null, 0, 0); } diff --git a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java index aba52a0fd481..55aee0f5fac9 100644 --- a/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/AggregatorsModule.java @@ -67,8 +67,6 @@ import org.apache.druid.query.aggregation.post.LongLeastPostAggregator; import org.apache.druid.segment.serde.ComplexMetrics; -/** - */ public class AggregatorsModule extends SimpleModule { public AggregatorsModule() diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java index f39227de3797..bb23ff952eca 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorFactory.java @@ -39,7 +39,7 @@ * max, sum of metric columns, or cardinality of dimension columns (see {@link * org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory}). * Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged - * to extend {@link NullableAggregatorFactory}. + * to extend {@link NullableNumericAggregatorFactory}. * * Implementations are also expected to correctly handle single/multi value string type columns as it makes sense * for them e.g. doubleSum aggregator tries to parse the string value as double and assumes it to be zero if parsing @@ -106,11 +106,11 @@ public AggregateCombiner makeAggregateCombiner() /** * Creates an {@link AggregateCombiner} which supports nullability. * Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged - * to extend {@link NullableAggregatorFactory} instead of overriding this method. + * to extend {@link NullableNumericAggregatorFactory} instead of overriding this method. * Default implementation calls {@link #makeAggregateCombiner()} for backwards compatibility. * * @see AggregateCombiner - * @see NullableAggregatorFactory + * @see NullableNumericAggregatorFactory */ public AggregateCombiner makeNullableAggregateCombiner() { @@ -221,7 +221,7 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre /** * Returns the maximum size that this aggregator will require in bytes for intermediate storage of results. * Implementations of {@link AggregatorFactory} which need to Support Nullable Aggregations are encouraged - * to extend {@link NullableAggregatorFactory} instead of overriding this method. + * to extend {@link NullableNumericAggregatorFactory} instead of overriding this method. * Default implementation calls {@link #makeAggregateCombiner()} for backwards compatibility. * * @return the maximum number of bytes that an aggregator of this type will require for intermediate result storage. diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregateCombiner.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregateCombiner.java similarity index 77% rename from processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregateCombiner.java rename to processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregateCombiner.java index 4b29d7b0e8d0..5a401eb85a1d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregateCombiner.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregateCombiner.java @@ -20,26 +20,26 @@ package org.apache.druid.query.aggregation; import org.apache.druid.guice.annotations.PublicApi; -import org.apache.druid.segment.BaseNullableColumnValueSelector; import org.apache.druid.segment.ColumnValueSelector; import javax.annotation.Nullable; /** - * The result of a NullableAggregateCombiner will be null if all the values to be combined are null values or no values - * are combined at all. If any of the value is non-null, the result would be the value of the delegate combiner. - * Note that the delegate combiner is not required to perform check for {@link BaseNullableColumnValueSelector#isNull()} - * on the selector as only non-null values will be passed to the delegate combiner. - * This class is only used when SQL compatible null handling is enabled. + * Null-aware numeric {@link AggregateCombiner}. + * + * Used by {@link NullableNumericAggregatorFactory#makeAggregateCombiner()} to wrap non-null aware combiners. This + * class is only used when SQL compatible null handling is enabled. + * + * @see NullableNumericAggregatorFactory#makeAggregateCombiner() */ @PublicApi -public final class NullableAggregateCombiner implements AggregateCombiner +public final class NullableNumericAggregateCombiner implements AggregateCombiner { private boolean isNullResult = true; private final AggregateCombiner delegate; - public NullableAggregateCombiner(AggregateCombiner delegate) + public NullableNumericAggregateCombiner(AggregateCombiner delegate) { this.delegate = delegate; } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregator.java similarity index 67% rename from processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregator.java rename to processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregator.java index 64658b6ca351..33c0c2438fd7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregator.java @@ -21,24 +21,34 @@ import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; import javax.annotation.Nullable; /** - * The result of a NullableAggregator will be null if all the values to be aggregated are null values - * or no values are aggregated at all. If any of the value is non-null, the result would be the aggregated - * value of the delegate aggregator. Note that the delegate aggregator is not required to perform check for - * {@link BaseNullableColumnValueSelector#isNull()} on the selector as only non-null values will be passed - * to the delegate aggregator. This class is only used when SQL compatible null handling is enabled. + * Null-aware numeric {@link Aggregator}. + * + * The result of this aggregator will be null if all the values to be aggregated are null values or no values are + * aggregated at all. If any of the values are non-null, the result will be the aggregated value of the delegate + * aggregator. + * + * When wrapped by this class, the underlying aggregator's required storage space is increased by one byte. The extra + * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before + * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) + * + * Used by {@link NullableNumericAggregatorFactory#factorize(ColumnSelectorFactory)} to wrap non-null aware + * aggregators. This class is only used when SQL compatible null handling is enabled. + * + * @see NullableNumericAggregatorFactory#factorize(ColumnSelectorFactory) */ @PublicApi -public final class NullableAggregator implements Aggregator +public final class NullableNumericAggregator implements Aggregator { private final Aggregator delegate; private final BaseNullableColumnValueSelector selector; private boolean isNullResult = true; - public NullableAggregator(Aggregator delegate, BaseNullableColumnValueSelector selector) + public NullableNumericAggregator(Aggregator delegate, BaseNullableColumnValueSelector selector) { this.delegate = delegate; this.selector = selector; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java similarity index 81% rename from processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregatorFactory.java rename to processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java index e88eac3428e5..d9d66e37599a 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericAggregatorFactory.java @@ -30,19 +30,28 @@ import org.apache.druid.segment.vector.VectorValueSelector; /** - * Abstract class with functionality to wrap {@link Aggregator}, {@link BufferAggregator} and {@link AggregateCombiner} - * to support nullable aggregations for SQL compatibility. Implementations of {@link AggregatorFactory} which need to - * Support Nullable Aggregations are encouraged to extend this class. + * Abstract superclass for null-aware numeric aggregators. + * + * Includes functionality to wrap {@link Aggregator}, {@link BufferAggregator}, {@link VectorAggregator}, and + * {@link AggregateCombiner} to support nullable aggregations. The result of this aggregator will be null if all the + * values to be aggregated are null values, or if no values are aggregated at all. If any of the values are non-null, + * the result will be the aggregated value of the non-null values. + * + * This superclass should only be extended by aggregators that read primitive numbers. It implements logic that is + * not valid for non-numeric selector methods such as {@link ColumnValueSelector#getObject()}. + * + * @see BaseNullableColumnValueSelector#isNull() for why this only works in the numeric case */ @ExtensionPoint -public abstract class NullableAggregatorFactory extends AggregatorFactory +public abstract class NullableNumericAggregatorFactory + extends AggregatorFactory { @Override public final Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) { T selector = selector(columnSelectorFactory); Aggregator aggregator = factorize(columnSelectorFactory, selector); - return NullHandling.replaceWithDefault() ? aggregator : new NullableAggregator(aggregator, selector); + return NullHandling.replaceWithDefault() ? aggregator : new NullableNumericAggregator(aggregator, selector); } @Override @@ -50,7 +59,7 @@ public final BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSele { T selector = selector(columnSelectorFactory); BufferAggregator aggregator = factorizeBuffered(columnSelectorFactory, selector); - return NullHandling.replaceWithDefault() ? aggregator : new NullableBufferAggregator(aggregator, selector); + return NullHandling.replaceWithDefault() ? aggregator : new NullableNumericBufferAggregator(aggregator, selector); } @Override @@ -59,14 +68,14 @@ public final VectorAggregator factorizeVector(VectorColumnSelectorFactory column Preconditions.checkState(canVectorize(), "Cannot vectorize"); VectorValueSelector selector = vectorSelector(columnSelectorFactory); VectorAggregator aggregator = factorizeVector(columnSelectorFactory, selector); - return NullHandling.replaceWithDefault() ? aggregator : new NullableVectorAggregator(aggregator, selector); + return NullHandling.replaceWithDefault() ? aggregator : new NullableNumericVectorAggregator(aggregator, selector); } @Override public final AggregateCombiner makeNullableAggregateCombiner() { AggregateCombiner combiner = makeAggregateCombiner(); - return NullHandling.replaceWithDefault() ? combiner : new NullableAggregateCombiner(combiner); + return NullHandling.replaceWithDefault() ? combiner : new NullableNumericAggregateCombiner(combiner); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericBufferAggregator.java similarity index 86% rename from processing/src/main/java/org/apache/druid/query/aggregation/NullableBufferAggregator.java rename to processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericBufferAggregator.java index 0e0d1694b72d..4a8ce6f792a4 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericBufferAggregator.java @@ -23,13 +23,13 @@ import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseNullableColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; /** - * A wrapper around a non-null-aware BufferAggregator that makes it null-aware. This removes the need for each - * aggregator class to handle nulls on its own. + * Null-aware numeric {@link BufferAggregator}. * * The result of this aggregator will be null if all the values to be aggregated are null values or no values are * aggregated at all. If any of the values are non-null, the result will be the aggregated value of the delegate @@ -39,15 +39,19 @@ * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) * - * @see NullableVectorAggregator, the vectorized version. + * Used by {@link NullableNumericAggregatorFactory#factorizeBuffered(ColumnSelectorFactory)} to wrap non-null aware + * aggregators. This class is only used when SQL compatible null handling is enabled. + * + * @see NullableNumericAggregatorFactory#factorizeBuffered(ColumnSelectorFactory) + * @see NullableNumericVectorAggregator the vectorized version. */ @PublicApi -public final class NullableBufferAggregator implements BufferAggregator +public final class NullableNumericBufferAggregator implements BufferAggregator { private final BufferAggregator delegate; private final BaseNullableColumnValueSelector nullSelector; - public NullableBufferAggregator(BufferAggregator delegate, BaseNullableColumnValueSelector nullSelector) + public NullableNumericBufferAggregator(BufferAggregator delegate, BaseNullableColumnValueSelector nullSelector) { this.delegate = delegate; this.nullSelector = nullSelector; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/NullableVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java similarity index 87% rename from processing/src/main/java/org/apache/druid/query/aggregation/NullableVectorAggregator.java rename to processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java index bb8faed98d58..cdc4499f013e 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/NullableVectorAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/NullableNumericVectorAggregator.java @@ -40,9 +40,15 @@ * byte is a boolean that stores whether or not any non-null values have been seen. The extra byte is placed before * the underlying aggregator's normal state. (Buffer layout = [nullability byte] [delegate storage bytes]) * - * @see NullableBufferAggregator, the vectorized version. + * The result of a NullableAggregator will be null if all the values to be aggregated are null values + * or no values are aggregated at all. If any of the value is non-null, the result would be the aggregated + * value of the delegate aggregator. Note that the delegate aggregator is not required to perform check for + * {@link VectorValueSelector#getNullVector()} on the selector as only non-null values will be passed + * to the delegate aggregator. This class is only used when SQL compatible null handling is enabled. + * + * @see NullableNumericBufferAggregator , the vectorized version. */ -public class NullableVectorAggregator implements VectorAggregator +public class NullableNumericVectorAggregator implements VectorAggregator { private final VectorAggregator delegate; private final VectorValueSelector selector; @@ -53,7 +59,7 @@ public class NullableVectorAggregator implements VectorAggregator @Nullable private int[] vAggregationRows = null; - NullableVectorAggregator(VectorAggregator delegate, VectorValueSelector selector) + NullableNumericVectorAggregator(VectorAggregator delegate, VectorValueSelector selector) { this.delegate = delegate; this.selector = selector; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java index f5586d0ff6a7..941b937ee43c 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java @@ -46,7 +46,7 @@ * It extends "NullableAggregatorFactory" instead of "NullableAggregatorFactory" * to additionally support aggregation on single/multi value string column types. */ -public abstract class SimpleDoubleAggregatorFactory extends NullableAggregatorFactory +public abstract class SimpleDoubleAggregatorFactory extends NullableNumericAggregatorFactory { protected final String name; @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java index 6c4badf18425..f82f5d3ef36d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleFloatAggregatorFactory.java @@ -39,7 +39,7 @@ import java.util.List; import java.util.Objects; -public abstract class SimpleFloatAggregatorFactory extends NullableAggregatorFactory +public abstract class SimpleFloatAggregatorFactory extends NullableNumericAggregatorFactory { protected final String name; @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java index f0dc0476536a..bbc7b43cd57e 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleLongAggregatorFactory.java @@ -45,7 +45,7 @@ * It extends "NullableAggregatorFactory" instead of "NullableAggregatorFactory" * to additionally support aggregation on single/multi value string column types. */ -public abstract class SimpleLongAggregatorFactory extends NullableAggregatorFactory +public abstract class SimpleLongAggregatorFactory extends NullableNumericAggregatorFactory { protected final String name; @Nullable diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java index 8782e1235cda..eb39e35e4efc 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -45,7 +45,7 @@ import java.util.Map; import java.util.Objects; -public class DoubleFirstAggregatorFactory extends NullableAggregatorFactory +public class DoubleFirstAggregatorFactory extends NullableNumericAggregatorFactory { public static final Comparator> VALUE_COMPARATOR = Comparator.comparingDouble(o -> o.rhs); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java index d7ddb4b3bf5b..4a2d1c12abd0 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -45,7 +45,7 @@ import java.util.Map; import java.util.Objects; -public class FloatFirstAggregatorFactory extends NullableAggregatorFactory +public class FloatFirstAggregatorFactory extends NullableNumericAggregatorFactory { public static final Comparator> VALUE_COMPARATOR = Comparator.comparingDouble(o -> o.rhs); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java index 6289b8c8fbce..399f1fd81617 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -44,7 +44,7 @@ import java.util.List; import java.util.Map; -public class LongFirstAggregatorFactory extends NullableAggregatorFactory +public class LongFirstAggregatorFactory extends NullableNumericAggregatorFactory { public static final Comparator> VALUE_COMPARATOR = Comparator.comparingLong(o -> o.rhs); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java index 84991e9645cb..02600444f67c 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java @@ -19,24 +19,26 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import javax.annotation.Nullable; + public class StringFirstAggregator implements Aggregator { - - private final BaseObjectColumnValueSelector valueSelector; + @Nullable private final BaseLongColumnValueSelector timeSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; protected long firstTime; protected String firstValue; public StringFirstAggregator( - BaseLongColumnValueSelector timeSelector, + @Nullable BaseLongColumnValueSelector timeSelector, BaseObjectColumnValueSelector valueSelector, int maxStringBytes ) @@ -45,35 +47,24 @@ public StringFirstAggregator( this.timeSelector = timeSelector; this.maxStringBytes = maxStringBytes; - firstTime = Long.MAX_VALUE; + firstTime = DateTimes.MAX.getMillis(); firstValue = null; } @Override public void aggregate() { - long time = timeSelector.getLong(); - if (time < firstTime) { - firstTime = time; - Object value = valueSelector.getObject(); + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); - if (value != null) { - if (value instanceof String) { - firstValue = (String) value; - } else if (value instanceof SerializablePairLongString) { - firstValue = ((SerializablePairLongString) value).rhs; - } else { - throw new ISE( - "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString", - value.getClass().getName() - ); - } + if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) { + firstTime = inPair.lhs; + firstValue = inPair.rhs; - if (firstValue != null && firstValue.length() > maxStringBytes) { - firstValue = firstValue.substring(0, maxStringBytes); - } - } else { - firstValue = null; + if (firstValue.length() > maxStringBytes) { + firstValue = firstValue.substring(0, maxStringBytes); } } } @@ -81,7 +72,7 @@ public void aggregate() @Override public Object get() { - return new SerializablePairLongString(firstTime, firstValue); + return new SerializablePairLongString(firstTime, StringFirstLastUtils.chop(firstValue, maxStringBytes)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java index cceebc0cd219..983ed9b2202b 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java @@ -29,10 +29,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnHolder; @@ -45,7 +43,7 @@ import java.util.Objects; @JsonTypeName("stringFirst") -public class StringFirstAggregatorFactory extends NullableAggregatorFactory +public class StringFirstAggregatorFactory extends AggregatorFactory { public static final int DEFAULT_MAX_STRING_SIZE = 1024; @@ -106,31 +104,27 @@ public StringFirstAggregatorFactory( Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName"); this.name = name; this.fieldName = fieldName; - this.maxStringBytes = maxStringBytes == null ? DEFAULT_MAX_STRING_SIZE : maxStringBytes; + this.maxStringBytes = maxStringBytes == null + ? StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE + : maxStringBytes; } @Override - protected BaseObjectColumnValueSelector selector(ColumnSelectorFactory metricFactory) - { - return metricFactory.makeColumnValueSelector(fieldName); - } - - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) + public Aggregator factorize(ColumnSelectorFactory metricFactory) { return new StringFirstAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - selector, + metricFactory.makeColumnValueSelector(fieldName), maxStringBytes ); } @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { return new StringFirstBufferAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - selector, + metricFactory.makeColumnValueSelector(fieldName), maxStringBytes ); } @@ -156,7 +150,7 @@ public AggregateCombiner makeAggregateCombiner() @Override public AggregatorFactory getCombiningFactory() { - return new StringFirstFoldingAggregatorFactory(name, name, maxStringBytes); + return new StringFirstAggregatorFactory(name, name, maxStringBytes); } @Override @@ -234,25 +228,25 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - StringFirstAggregatorFactory that = (StringFirstAggregatorFactory) o; - - return fieldName.equals(that.fieldName) && name.equals(that.name) && maxStringBytes == that.maxStringBytes; + return maxStringBytes == that.maxStringBytes && + Objects.equals(fieldName, that.fieldName) && + Objects.equals(name, that.name); } @Override public int hashCode() { - return Objects.hash(name, fieldName, maxStringBytes); + return Objects.hash(fieldName, name, maxStringBytes); } @Override public String toString() { return "StringFirstAggregatorFactory{" + - "name='" + name + '\'' + - ", fieldName='" + fieldName + '\'' + - ", maxStringBytes=" + maxStringBytes + '\'' + + "fieldName='" + fieldName + '\'' + + ", name='" + name + '\'' + + ", maxStringBytes=" + maxStringBytes + '}'; } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java index 2c0fb1e6587f..5a4c00686e21 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java @@ -19,8 +19,7 @@ package org.apache.druid.query.aggregation.first; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -31,6 +30,11 @@ public class StringFirstBufferAggregator implements BufferAggregator { + private static final SerializablePairLongString INIT = new SerializablePairLongString( + DateTimes.MAX.getMillis(), + null + ); + private final BaseLongColumnValueSelector timeSelector; private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; @@ -49,79 +53,34 @@ public StringFirstBufferAggregator( @Override public void init(ByteBuffer buf, int position) { - buf.putLong(position, Long.MAX_VALUE); - buf.putInt(position + Long.BYTES, 0); + StringFirstLastUtils.writePair(buf, position, INIT, maxStringBytes); } @Override public void aggregate(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - Object value = valueSelector.getObject(); - - long time = timeSelector.getLong(); - String firstString = null; - - if (value != null) { - if (value instanceof SerializablePairLongString) { - SerializablePairLongString serializablePair = (SerializablePairLongString) value; - time = serializablePair.lhs; - firstString = serializablePair.rhs; - } else if (value instanceof String) { - firstString = (String) value; - } else { - throw new ISE( - "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString", - value.getClass().getName() + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); + + if (inPair != null && inPair.rhs != null) { + final long firstTime = buf.getLong(position); + if (inPair.lhs < firstTime) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(inPair.lhs, inPair.rhs), + maxStringBytes ); } } - - long lastTime = mutationBuffer.getLong(position); - - if (time < lastTime) { - if (firstString != null) { - if (firstString.length() > maxStringBytes) { - firstString = firstString.substring(0, maxStringBytes); - } - - byte[] valueBytes = StringUtils.toUtf8(firstString); - - mutationBuffer.putLong(position, time); - mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); - - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.put(valueBytes); - } else { - mutationBuffer.putLong(position, time); - mutationBuffer.putInt(position + Long.BYTES, 0); - } - } } @Override public Object get(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - Long timeValue = mutationBuffer.getLong(position); - int stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES); - - SerializablePairLongString serializablePair; - - if (stringSizeBytes > 0) { - byte[] valueBytes = new byte[stringSizeBytes]; - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.get(valueBytes, 0, stringSizeBytes); - serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes)); - } else { - serializablePair = new SerializablePairLongString(timeValue, null); - } - - return serializablePair; + return StringFirstLastUtils.readPair(buf, position); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java index 1c49a54f9846..6bade403945c 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java @@ -19,85 +19,16 @@ package org.apache.druid.query.aggregation.first; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.SerializablePairLongString; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseObjectColumnValueSelector; -import org.apache.druid.segment.ColumnSelectorFactory; +import com.fasterxml.jackson.annotation.JsonCreator; -import java.nio.ByteBuffer; - -@JsonTypeName("stringFirstFold") +/** + * For backwards compatibility; equivalent to a regular StringFirstAggregatorFactory. + */ public class StringFirstFoldingAggregatorFactory extends StringFirstAggregatorFactory { - public StringFirstFoldingAggregatorFactory( - @JsonProperty("name") String name, - @JsonProperty("fieldName") final String fieldName, - @JsonProperty("maxStringBytes") Integer maxStringBytes - ) + @JsonCreator + public StringFirstFoldingAggregatorFactory(String name, String fieldName, Integer maxStringBytes) { super(name, fieldName, maxStringBytes); } - - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) - { - return new StringFirstAggregator(null, null, maxStringBytes) - { - @Override - public void aggregate() - { - SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); - if (pair != null && pair.lhs < firstTime) { - firstTime = pair.lhs; - firstValue = pair.rhs; - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) - { - return new StringFirstBufferAggregator(null, null, maxStringBytes) - { - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); - - if (pair != null && pair.lhs != null) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - long lastTime = mutationBuffer.getLong(position); - - if (pair.lhs < lastTime) { - mutationBuffer.putLong(position, pair.lhs); - - if (pair.rhs != null) { - byte[] valueBytes = StringUtils.toUtf8(pair.rhs); - - mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.put(valueBytes); - } else { - mutationBuffer.putInt(position + Long.BYTES, 0); - } - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } - } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java new file mode 100644 index 000000000000..133c4ba1ffcc --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.first; + +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.segment.BaseLongColumnValueSelector; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.DimensionHandlerUtils; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +public class StringFirstLastUtils +{ + private static final int NULL_VALUE = -1; + + @Nullable + public static String chop(@Nullable final String s, final int maxBytes) + { + if (s == null) { + return null; + } else { + // Shorten firstValue to what could fit in maxBytes as UTF-8. + final byte[] bytes = new byte[maxBytes]; + final int len = StringUtils.toUtf8WithLimit(s, ByteBuffer.wrap(bytes)); + return new String(bytes, 0, len, StandardCharsets.UTF_8); + } + } + + @Nullable + public static SerializablePairLongString readPairFromSelectors( + final BaseLongColumnValueSelector timeSelector, + final BaseObjectColumnValueSelector valueSelector + ) + { + final long time; + final String string; + + // Need to read this first (before time), just in case it's a SerializablePairLongString (we don't know; it's + // detected at query time). + final Object object = valueSelector.getObject(); + + if (object instanceof SerializablePairLongString) { + final SerializablePairLongString pair = (SerializablePairLongString) object; + time = pair.lhs; + string = pair.rhs; + } else if (object != null) { + time = timeSelector.getLong(); + string = DimensionHandlerUtils.convertObjectToString(object); + } else { + // Don't aggregate nulls. + return null; + } + + return new SerializablePairLongString(time, string); + } + + public static void writePair( + final ByteBuffer buf, + final int position, + final SerializablePairLongString pair, + final int maxStringBytes + ) + { + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + mutationBuffer.putLong(pair.lhs); + + if (pair.rhs != null) { + mutationBuffer.position(position + Long.BYTES + Integer.BYTES); + mutationBuffer.limit(maxStringBytes); + final int len = StringUtils.toUtf8WithLimit(pair.rhs, mutationBuffer); + mutationBuffer.putInt(position + Long.BYTES, len); + } else { + mutationBuffer.putInt(NULL_VALUE); + } + } + + public static SerializablePairLongString readPair(final ByteBuffer buf, final int position) + { + ByteBuffer copyBuffer = buf.duplicate(); + copyBuffer.position(position); + + Long timeValue = copyBuffer.getLong(); + int stringSizeBytes = copyBuffer.getInt(); + + if (stringSizeBytes >= 0) { + byte[] valueBytes = new byte[stringSizeBytes]; + copyBuffer.get(valueBytes, 0, stringSizeBytes); + return new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes)); + } else { + return new SerializablePairLongString(timeValue, null); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java index 2b343beeb39b..ad90857cc82f 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -47,7 +47,7 @@ import java.util.Map; import java.util.Objects; -public class DoubleLastAggregatorFactory extends NullableAggregatorFactory +public class DoubleLastAggregatorFactory extends NullableNumericAggregatorFactory { private final String fieldName; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java index 16e521629e77..0e42f9fded7d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; @@ -47,7 +47,7 @@ import java.util.Map; import java.util.Objects; -public class FloatLastAggregatorFactory extends NullableAggregatorFactory +public class FloatLastAggregatorFactory extends NullableNumericAggregatorFactory { private final String fieldName; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java index 1b42fe1aaf49..4a00a63dce7d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java @@ -30,7 +30,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; +import org.apache.druid.query.aggregation.NullableNumericAggregatorFactory; import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnSelectorFactory; @@ -46,7 +46,7 @@ import java.util.Map; import java.util.Objects; -public class LongLastAggregatorFactory extends NullableAggregatorFactory +public class LongLastAggregatorFactory extends NullableNumericAggregatorFactory { private final String fieldName; private final String name; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java index 9d93c9c6c1d8..01be5db49927 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java @@ -19,17 +19,17 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; public class StringLastAggregator implements Aggregator { - - private final BaseObjectColumnValueSelector valueSelector; private final BaseLongColumnValueSelector timeSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; protected long lastTime; @@ -45,35 +45,29 @@ public StringLastAggregator( this.timeSelector = timeSelector; this.maxStringBytes = maxStringBytes; - lastTime = Long.MIN_VALUE; + lastTime = DateTimes.MIN.getMillis(); lastValue = null; } @Override public void aggregate() { - long time = timeSelector.getLong(); - if (time >= lastTime) { - lastTime = time; - Object value = valueSelector.getObject(); + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); + + if (inPair == null) { + // Don't aggregate nulls. + return; + } - if (value != null) { - if (value instanceof String) { - lastValue = (String) value; - } else if (value instanceof SerializablePairLongString) { - lastValue = ((SerializablePairLongString) value).rhs; - } else { - throw new ISE( - "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString", - value.getClass().getName() - ); - } + if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) { + lastTime = inPair.lhs; + lastValue = inPair.rhs; - if (lastValue != null && lastValue.length() > maxStringBytes) { - lastValue = lastValue.substring(0, maxStringBytes); - } - } else { - lastValue = null; + if (lastValue.length() > maxStringBytes) { + lastValue = lastValue.substring(0, maxStringBytes); } } } @@ -81,25 +75,25 @@ public void aggregate() @Override public Object get() { - return new SerializablePairLongString(lastTime, lastValue); + return new SerializablePairLongString(lastTime, StringFirstLastUtils.chop(lastValue, maxStringBytes)); } @Override public float getFloat() { - throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()"); + throw new UnsupportedOperationException("StringLastAggregator does not support getFloat()"); } @Override public long getLong() { - throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()"); + throw new UnsupportedOperationException("StringLastAggregator does not support getLong()"); } @Override public double getDouble() { - throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()"); + throw new UnsupportedOperationException("StringLastAggregator does not support getDouble()"); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java index 113a1747702d..b024af022dbd 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java @@ -23,21 +23,19 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.NullableAggregatorFactory; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; import org.apache.druid.query.cache.CacheKeyBuilder; -import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnHolder; import javax.annotation.Nullable; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -45,7 +43,7 @@ import java.util.Objects; @JsonTypeName("stringLast") -public class StringLastAggregatorFactory extends NullableAggregatorFactory +public class StringLastAggregatorFactory extends AggregatorFactory { private final String fieldName; private final String name; @@ -68,27 +66,21 @@ public StringLastAggregatorFactory( } @Override - protected BaseObjectColumnValueSelector selector(ColumnSelectorFactory metricFactory) - { - return metricFactory.makeColumnValueSelector(fieldName); - } - - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) + public Aggregator factorize(ColumnSelectorFactory metricFactory) { return new StringLastAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - selector, + metricFactory.makeColumnValueSelector(fieldName), maxStringBytes ); } @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { return new StringLastBufferAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - selector, + metricFactory.makeColumnValueSelector(fieldName), maxStringBytes ); } @@ -114,7 +106,7 @@ public AggregateCombiner makeAggregateCombiner() @Override public AggregatorFactory getCombiningFactory() { - return new StringLastFoldingAggregatorFactory(name, name, maxStringBytes); + return new StringLastAggregatorFactory(name, name, maxStringBytes); } @Override @@ -159,7 +151,7 @@ public Integer getMaxStringBytes() @Override public List requiredFields() { - return Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, fieldName); + return ImmutableList.of(ColumnHolder.TIME_COLUMN_NAME, fieldName); } @Override @@ -192,25 +184,25 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - StringLastAggregatorFactory that = (StringLastAggregatorFactory) o; - - return fieldName.equals(that.fieldName) && name.equals(that.name) && maxStringBytes == that.maxStringBytes; + return maxStringBytes == that.maxStringBytes && + Objects.equals(fieldName, that.fieldName) && + Objects.equals(name, that.name); } @Override public int hashCode() { - return Objects.hash(name, fieldName, maxStringBytes); + return Objects.hash(fieldName, name, maxStringBytes); } @Override public String toString() { - return "StringFirstAggregatorFactory{" + - "name='" + name + '\'' + - ", fieldName='" + fieldName + '\'' + - ", maxStringBytes=" + maxStringBytes + '\'' + + return "StringLastAggregatorFactory{" + + "fieldName='" + fieldName + '\'' + + ", name='" + name + '\'' + + ", maxStringBytes=" + maxStringBytes + '}'; } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java index 9271e5aba726..30ea4289939a 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java @@ -19,10 +19,10 @@ package org.apache.druid.query.aggregation.last; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; @@ -31,6 +31,11 @@ public class StringLastBufferAggregator implements BufferAggregator { + private static final SerializablePairLongString INIT = new SerializablePairLongString( + DateTimes.MIN.getMillis(), + null + ); + private final BaseLongColumnValueSelector timeSelector; private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; @@ -49,79 +54,34 @@ public StringLastBufferAggregator( @Override public void init(ByteBuffer buf, int position) { - buf.putLong(position, Long.MIN_VALUE); - buf.putInt(position + Long.BYTES, 0); + StringFirstLastUtils.writePair(buf, position, INIT, maxStringBytes); } @Override public void aggregate(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - Object value = valueSelector.getObject(); - - long time = timeSelector.getLong(); - String lastString = null; - - if (value != null) { - if (value instanceof SerializablePairLongString) { - SerializablePairLongString serializablePair = (SerializablePairLongString) value; - time = serializablePair.lhs; - lastString = serializablePair.rhs; - } else if (value instanceof String) { - lastString = (String) value; - } else { - throw new ISE( - "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString", - value.getClass().getName() + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); + + if (inPair != null && inPair.rhs != null) { + final long lastTime = buf.getLong(position); + if (inPair.lhs >= lastTime) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(inPair.lhs, inPair.rhs), + maxStringBytes ); } } - - long lastTime = mutationBuffer.getLong(position); - - if (time >= lastTime) { - if (lastString != null) { - if (lastString.length() > maxStringBytes) { - lastString = lastString.substring(0, maxStringBytes); - } - - byte[] valueBytes = StringUtils.toUtf8(lastString); - - mutationBuffer.putLong(position, time); - mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); - - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.put(valueBytes); - } else { - mutationBuffer.putLong(position, time); - mutationBuffer.putInt(position + Long.BYTES, 0); - } - } } @Override public Object get(ByteBuffer buf, int position) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - Long timeValue = mutationBuffer.getLong(position); - int stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES); - - SerializablePairLongString serializablePair; - - if (stringSizeBytes > 0) { - byte[] valueBytes = new byte[stringSizeBytes]; - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.get(valueBytes, 0, stringSizeBytes); - serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes)); - } else { - serializablePair = new SerializablePairLongString(timeValue, null); - } - - return serializablePair; + return StringFirstLastUtils.readPair(buf, position); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java index 77d4ad3a94b9..7f92f00b4aed 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java @@ -19,82 +19,16 @@ package org.apache.druid.query.aggregation.last; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.query.aggregation.Aggregator; -import org.apache.druid.query.aggregation.BufferAggregator; -import org.apache.druid.query.aggregation.SerializablePairLongString; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.BaseObjectColumnValueSelector; -import org.apache.druid.segment.ColumnSelectorFactory; +import com.fasterxml.jackson.annotation.JsonCreator; -import java.nio.ByteBuffer; - -@JsonTypeName("stringLastFold") +/** + * For backwards compatibility; equivalent to a regular StringLastAggregatorFactory. + */ public class StringLastFoldingAggregatorFactory extends StringLastAggregatorFactory { - public StringLastFoldingAggregatorFactory( - @JsonProperty("name") String name, - @JsonProperty("fieldName") final String fieldName, - @JsonProperty("maxStringBytes") Integer maxStringBytes - ) + @JsonCreator + public StringLastFoldingAggregatorFactory(String name, String fieldName, Integer maxStringBytes) { super(name, fieldName, maxStringBytes); } - - @Override - public Aggregator factorize(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) - { - return new StringLastAggregator(null, null, maxStringBytes) - { - @Override - public void aggregate() - { - SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); - if (pair != null && pair.lhs >= lastTime) { - lastTime = pair.lhs; - lastValue = pair.rhs; - } - } - }; - } - - @Override - public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory, BaseObjectColumnValueSelector selector) - { - return new StringLastBufferAggregator(null, null, maxStringBytes) - { - @Override - public void aggregate(ByteBuffer buf, int position) - { - SerializablePairLongString pair = (SerializablePairLongString) selector.getObject(); - if (pair != null && pair.lhs != null) { - ByteBuffer mutationBuffer = buf.duplicate(); - mutationBuffer.position(position); - - long lastTime = mutationBuffer.getLong(position); - - if (pair.lhs >= lastTime) { - mutationBuffer.putLong(position, pair.lhs); - if (pair.rhs != null) { - byte[] valueBytes = StringUtils.toUtf8(pair.rhs); - - mutationBuffer.putInt(position + Long.BYTES, valueBytes.length); - mutationBuffer.position(position + Long.BYTES + Integer.BYTES); - mutationBuffer.put(valueBytes); - } else { - mutationBuffer.putInt(position + Long.BYTES, 0); - } - } - } - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - inspector.visit("selector", selector); - } - }; - } } diff --git a/processing/src/main/java/org/apache/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java index de50bc5d2560..b2f19f519734 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java @@ -32,7 +32,7 @@ public ValueMatcher makeValueMatcher(final BaseDoubleColumnValueSelector selecto { final Double matchVal = DimensionHandlerUtils.convertObjectToDouble(value); if (matchVal == null) { - return ValueMatcher.nullValueMatcher(selector); + return ValueMatcher.primitiveNullValueMatcher(selector); } final long matchValLongBits = Double.doubleToLongBits(matchVal); diff --git a/processing/src/main/java/org/apache/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java index a37f0d592fbb..980fc32eb053 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java @@ -31,7 +31,7 @@ public ValueMatcher makeValueMatcher(final BaseFloatColumnValueSelector selector { final Float matchVal = DimensionHandlerUtils.convertObjectToFloat(value); if (matchVal == null) { - return ValueMatcher.nullValueMatcher(selector); + return ValueMatcher.primitiveNullValueMatcher(selector); } final int matchValIntBits = Float.floatToIntBits(matchVal); diff --git a/processing/src/main/java/org/apache/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java index 71a59aeae3f7..8ff746c6c6a1 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java @@ -31,7 +31,7 @@ public ValueMatcher makeValueMatcher(final BaseLongColumnValueSelector selector, { final Long matchVal = DimensionHandlerUtils.convertObjectToLong(value); if (matchVal == null) { - return ValueMatcher.nullValueMatcher(selector); + return ValueMatcher.primitiveNullValueMatcher(selector); } final long matchValLong = matchVal; return new ValueMatcher() diff --git a/processing/src/main/java/org/apache/druid/query/filter/ValueMatcher.java b/processing/src/main/java/org/apache/druid/query/filter/ValueMatcher.java index a800ec7e39e7..c45e14aa1e99 100644 --- a/processing/src/main/java/org/apache/druid/query/filter/ValueMatcher.java +++ b/processing/src/main/java/org/apache/druid/query/filter/ValueMatcher.java @@ -37,7 +37,12 @@ public interface ValueMatcher extends HotLoopCallee boolean matches(); // Utility method to match null values. - static ValueMatcher nullValueMatcher(BaseNullableColumnValueSelector selector) + + /** + * Returns a ValueMatcher that matches when the primitive long, double, or float value from {@code selector} + * should be treated as null. + */ + static ValueMatcher primitiveNullValueMatcher(BaseNullableColumnValueSelector selector) { return new ValueMatcher() { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index df29bf7b3167..fd3213fe9a8e 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -48,7 +48,7 @@ import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorPlus; import org.apache.druid.query.groupby.epinephelinae.column.GroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.column.LongGroupByColumnSelectorStrategy; -import org.apache.druid.query.groupby.epinephelinae.column.NullableValueGroupByColumnSelectorStrategy; +import org.apache.druid.query.groupby.epinephelinae.column.NullableNumericGroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.column.StringGroupByColumnSelectorStrategy; import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine; import org.apache.druid.query.groupby.orderby.DefaultLimitSpec; @@ -350,20 +350,20 @@ public GroupByColumnSelectorStrategy makeColumnSelectorStrategy( return new DictionaryBuildingStringGroupByColumnSelectorStrategy(); } case LONG: - return makeNullableStrategy(new LongGroupByColumnSelectorStrategy()); + return makeNullableNumericStrategy(new LongGroupByColumnSelectorStrategy()); case FLOAT: - return makeNullableStrategy(new FloatGroupByColumnSelectorStrategy()); + return makeNullableNumericStrategy(new FloatGroupByColumnSelectorStrategy()); case DOUBLE: - return makeNullableStrategy(new DoubleGroupByColumnSelectorStrategy()); + return makeNullableNumericStrategy(new DoubleGroupByColumnSelectorStrategy()); default: throw new IAE("Cannot create query type helper from invalid type [%s]", type); } } - private GroupByColumnSelectorStrategy makeNullableStrategy(GroupByColumnSelectorStrategy delegate) + private GroupByColumnSelectorStrategy makeNullableNumericStrategy(GroupByColumnSelectorStrategy delegate) { if (NullHandling.sqlCompatible()) { - return new NullableValueGroupByColumnSelectorStrategy(delegate); + return new NullableNumericGroupByColumnSelectorStrategy(delegate); } else { return delegate; } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableValueGroupByColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java similarity index 88% rename from processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableValueGroupByColumnSelectorStrategy.java rename to processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java index 3cbd6b8dc602..663a0dd2b211 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableValueGroupByColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/column/NullableNumericGroupByColumnSelectorStrategy.java @@ -29,11 +29,17 @@ import javax.annotation.Nullable; import java.nio.ByteBuffer; -public class NullableValueGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy +/** + * A wrapper around a numeric {@link GroupByColumnSelectorStrategy} that makes it null-aware. Should only be used + * for numeric strategies, not for string strategies. + * + * @see org.apache.druid.segment.BaseNullableColumnValueSelector#isNull() for why this only works in the numeric case + */ +public class NullableNumericGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy { private final GroupByColumnSelectorStrategy delegate; - public NullableValueGroupByColumnSelectorStrategy(GroupByColumnSelectorStrategy delegate) + public NullableNumericGroupByColumnSelectorStrategy(GroupByColumnSelectorStrategy delegate) { this.delegate = delegate; } diff --git a/processing/src/main/java/org/apache/druid/segment/BaseNullableColumnValueSelector.java b/processing/src/main/java/org/apache/druid/segment/BaseNullableColumnValueSelector.java index ae1199e8ae39..dd838fed71c0 100644 --- a/processing/src/main/java/org/apache/druid/segment/BaseNullableColumnValueSelector.java +++ b/processing/src/main/java/org/apache/druid/segment/BaseNullableColumnValueSelector.java @@ -24,16 +24,19 @@ /** * Null value checking polymorphic "part" of the {@link ColumnValueSelector} interface for primitive values. - * Users of {@link BaseLongColumnValueSelector#getLong()}, {@link BaseDoubleColumnValueSelector#getDouble()} - * and {@link BaseFloatColumnValueSelector#getFloat()} are required to check the nullability of the primitive - * types returned. */ @PublicApi public interface BaseNullableColumnValueSelector { /** - * Returns true if selected primitive value is null for {@link BaseFloatColumnValueSelector}, - * {@link BaseLongColumnValueSelector} and {@link BaseDoubleColumnValueSelector} otherwise false. + * Returns true if the primitive long, double, or float value returned by this selector should be treated as null. + * + * Users of {@link BaseLongColumnValueSelector#getLong()}, {@link BaseDoubleColumnValueSelector#getDouble()} + * and {@link BaseFloatColumnValueSelector#getFloat()} must check this method first, or else they may improperly + * use placeholder values returned by the primitive get methods. + * + * Users of {@link BaseObjectColumnValueSelector#getObject()} should not call this method. Instead, call "getObject" + * and check if it is null. */ @CalledFromHotLoop boolean isNull(); diff --git a/processing/src/main/java/org/apache/druid/segment/BaseObjectColumnValueSelector.java b/processing/src/main/java/org/apache/druid/segment/BaseObjectColumnValueSelector.java index 859f401b57bb..6bd9557e1d11 100644 --- a/processing/src/main/java/org/apache/druid/segment/BaseObjectColumnValueSelector.java +++ b/processing/src/main/java/org/apache/druid/segment/BaseObjectColumnValueSelector.java @@ -31,7 +31,7 @@ * All implementations of this interface MUST also implement {@link ColumnValueSelector}. */ @ExtensionPoint -public interface BaseObjectColumnValueSelector extends BaseNullableColumnValueSelector +public interface BaseObjectColumnValueSelector { @Nullable T getObject(); diff --git a/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java b/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java index a8dd18fef434..8283fd728e72 100644 --- a/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java +++ b/processing/src/main/java/org/apache/druid/segment/filter/ExpressionFilter.java @@ -63,25 +63,35 @@ public ValueMatcher makeMatcher(final ColumnSelectorFactory factory) @Override public boolean matches() { - if (NullHandling.sqlCompatible() && selector.isNull()) { - return false; - } - ExprEval eval = selector.getObject(); - if (eval == null) { - return false; - } + final ExprEval eval = selector.getObject(); + switch (eval.type()) { case LONG_ARRAY: - Long[] lResult = eval.asLongArray(); + final Long[] lResult = eval.asLongArray(); + if (lResult == null) { + return false; + } + return Arrays.stream(lResult).anyMatch(Evals::asBoolean); + case STRING_ARRAY: - String[] sResult = eval.asStringArray(); + final String[] sResult = eval.asStringArray(); + if (sResult == null) { + return false; + } + return Arrays.stream(sResult).anyMatch(Evals::asBoolean); + case DOUBLE_ARRAY: - Double[] dResult = eval.asDoubleArray(); + final Double[] dResult = eval.asDoubleArray(); + if (dResult == null) { + return false; + } + return Arrays.stream(dResult).anyMatch(Evals::asBoolean); + default: - return Evals.asBoolean(selector.getLong()); + return eval.asBoolean(); } } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index 7d7fcac60eba..5f6c8fce07cb 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -138,12 +138,19 @@ class IncrementalIndexInputRowColumnSelectorFactory implements ColumnSelectorFac public ColumnValueSelector makeColumnValueSelector(final String column) { final String typeName = agg.getTypeName(); - boolean isComplexMetric = + final boolean isComplexMetric = GuavaUtils.getEnumIfPresent(ValueType.class, StringUtils.toUpperCase(typeName)) == null || typeName.equalsIgnoreCase(ValueType.COMPLEX.name()); + + final ColumnValueSelector selector = baseSelectorFactory.makeColumnValueSelector(column); + if (!isComplexMetric || !deserializeComplexMetrics) { - return baseSelectorFactory.makeColumnValueSelector(column); + return selector; } else { + // Wrap selector in a special one that uses ComplexMetricSerde to modify incoming objects. + // For complex aggregators that read from multiple columns, we wrap all of them. This is not ideal but it + // has worked so far. + final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); if (serde == null) { throw new ISE("Don't know how to handle type[%s]", typeName); @@ -155,31 +162,25 @@ public ColumnValueSelector makeColumnValueSelector(final String column) @Override public boolean isNull() { - return in.get().getMetric(column) == null; + return selector.isNull(); } @Override public long getLong() { - Number metric = in.get().getMetric(column); - assert NullHandling.replaceWithDefault() || metric != null; - return DimensionHandlerUtils.nullToZero(metric).longValue(); + return selector.getLong(); } @Override public float getFloat() { - Number metric = in.get().getMetric(column); - assert NullHandling.replaceWithDefault() || metric != null; - return DimensionHandlerUtils.nullToZero(metric).floatValue(); + return selector.getFloat(); } @Override public double getDouble() { - Number metric = in.get().getMetric(column); - assert NullHandling.replaceWithDefault() || metric != null; - return DimensionHandlerUtils.nullToZero(metric).doubleValue(); + return selector.getDouble(); } @Override @@ -192,6 +193,7 @@ public Class classOfObject() @Override public Object getObject() { + // Here is where the magic happens: read from "in" directly, don't go through the normal "selector". return extractor.extractValue(in.get(), column, agg); } @@ -199,6 +201,7 @@ public Object getObject() public void inspectRuntimeShape(RuntimeShapeInspector inspector) { inspector.visit("in", in); + inspector.visit("selector", selector); inspector.visit("extractor", extractor); } }; @@ -997,7 +1000,10 @@ public Iterator iterator() return iterableWithPostAggregations(null, false).iterator(); } - public Iterable iterableWithPostAggregations(@Nullable final List postAggs, final boolean descending) + public Iterable iterableWithPostAggregations( + @Nullable final List postAggs, + final boolean descending + ) { return () -> { final List dimensions = getDimensions(); @@ -1237,6 +1243,7 @@ interface FactsHolder /** * Get all {@link IncrementalIndexRow} to persist, ordered with {@link Comparator} + * * @return */ Iterable persistIterable(); diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java index 1348029d9980..98cbe36fb156 100644 --- a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java +++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java @@ -110,8 +110,8 @@ public Object getObject() ExprEval eval = baseSelector.getObject(); if (eval.isArray()) { return Arrays.stream(eval.asStringArray()) - .map(NullHandling::emptyToNullIfNeeded) - .collect(Collectors.toList()); + .map(NullHandling::emptyToNullIfNeeded) + .collect(Collectors.toList()); } return eval.value(); } @@ -374,15 +374,15 @@ private static Expr.ObjectBinding createBindings( if (nativeType == ValueType.FLOAT) { ColumnValueSelector selector = columnSelectorFactory .makeColumnValueSelector(columnName); - supplier = makeNullableSupplier(selector, selector::getFloat); + supplier = makeNullableNumericSupplier(selector, selector::getFloat); } else if (nativeType == ValueType.LONG) { ColumnValueSelector selector = columnSelectorFactory .makeColumnValueSelector(columnName); - supplier = makeNullableSupplier(selector, selector::getLong); + supplier = makeNullableNumericSupplier(selector, selector::getLong); } else if (nativeType == ValueType.DOUBLE) { ColumnValueSelector selector = columnSelectorFactory .makeColumnValueSelector(columnName); - supplier = makeNullableSupplier(selector, selector::getDouble); + supplier = makeNullableNumericSupplier(selector, selector::getDouble); } else if (nativeType == ValueType.STRING) { supplier = supplierFromDimensionSelector( columnSelectorFactory.makeDimensionSelector(new DefaultDimensionSpec(columnName, columnName)), @@ -419,7 +419,12 @@ private static Expr.ObjectBinding createBindings( } } - private static Supplier makeNullableSupplier( + /** + * Wraps a {@link ColumnValueSelector} and uses it to supply numeric values in a null-aware way. + * + * @see org.apache.druid.segment.BaseNullableColumnValueSelector#isNull() for why this only works in the numeric case + */ + private static Supplier makeNullableNumericSupplier( ColumnValueSelector selector, Supplier supplier ) diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java index c4ae2671bd84..190c3435885b 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java @@ -39,7 +39,7 @@ public class StringFirstAggregationTest { private final Integer MAX_STRING_SIZE = 1024; - private AggregatorFactory stringLastAggFactory; + private AggregatorFactory stringFirstAggFactory; private AggregatorFactory combiningAggFactory; private ColumnSelectorFactory colSelectorFactory; private TestLongColumnSelector timeSelector; @@ -59,8 +59,8 @@ public class StringFirstAggregationTest @Before public void setup() { - stringLastAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE); - combiningAggFactory = stringLastAggFactory.getCombiningFactory(); + stringFirstAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE); + combiningAggFactory = stringFirstAggFactory.getCombiningFactory(); timeSelector = new TestLongColumnSelector(times); valueSelector = new TestObjectColumnSelector<>(strings); objectSelector = new TestObjectColumnSelector<>(pairs); @@ -72,9 +72,9 @@ public void setup() } @Test - public void testStringLastAggregator() + public void testStringFirstAggregator() { - Aggregator agg = stringLastAggFactory.factorize(colSelectorFactory); + Aggregator agg = stringFirstAggFactory.factorize(colSelectorFactory); aggregate(agg); aggregate(agg); @@ -87,12 +87,12 @@ public void testStringLastAggregator() } @Test - public void testStringLastBufferAggregator() + public void testStringFirstBufferAggregator() { - BufferAggregator agg = stringLastAggFactory.factorizeBuffered( + BufferAggregator agg = stringFirstAggFactory.factorizeBuffered( colSelectorFactory); - ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]); + ByteBuffer buffer = ByteBuffer.wrap(new byte[stringFirstAggFactory.getMaxIntermediateSize()]); agg.init(buffer, 0); aggregate(agg, buffer, 0); @@ -110,11 +110,11 @@ public void testCombine() { SerializablePairLongString pair1 = new SerializablePairLongString(1467225000L, "AAAA"); SerializablePairLongString pair2 = new SerializablePairLongString(1467240000L, "BBBB"); - Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2)); + Assert.assertEquals(pair2, stringFirstAggFactory.combine(pair1, pair2)); } @Test - public void testStringLastCombiningAggregator() + public void testStringFirstCombiningAggregator() { Aggregator agg = combiningAggFactory.factorize(colSelectorFactory); @@ -136,7 +136,7 @@ public void testStringFirstCombiningBufferAggregator() BufferAggregator agg = combiningAggFactory.factorizeBuffered( colSelectorFactory); - ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]); + ByteBuffer buffer = ByteBuffer.wrap(new byte[stringFirstAggFactory.getMaxIntermediateSize()]); agg.init(buffer, 0); aggregate(agg, buffer, 0); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java index 7d3e0566f15c..04700fae42df 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java @@ -27,7 +27,6 @@ import org.junit.Test; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; public class StringFirstBufferAggregatorTest { @@ -65,13 +64,7 @@ public void testBufferAggregate() maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -83,7 +76,7 @@ public void testBufferAggregate() SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); - Assert.assertEquals("expectec last string value", strings[0], sp.rhs); + Assert.assertEquals("expected last string value", strings[0], sp.rhs); Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs)); } @@ -109,13 +102,7 @@ public void testNullBufferAggregate() maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -127,12 +114,12 @@ public void testNullBufferAggregate() SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); - Assert.assertEquals("expectec last string value", strings[1], sp.rhs); + Assert.assertEquals("expected last string value", strings[1], sp.rhs); Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[1]), new Long(sp.lhs)); } - @Test(expected = IllegalStateException.class) + @Test public void testNoStringValue() { @@ -153,13 +140,7 @@ public void testNoStringValue() maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -167,5 +148,10 @@ public void testNoStringValue() for (int i = 0; i < timestamps.length; i++) { aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + Assert.assertEquals(1526724600L, (long) sp.lhs); + Assert.assertEquals("2.0", sp.rhs); } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java index 6442002f971a..3ae838442fb2 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.aggregation.first; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.druid.data.input.MapBasedInputRow; @@ -29,14 +30,21 @@ import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.SerializablePairLongStringSerde; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.apache.druid.segment.incremental.IndexSizeExceededException; +import org.apache.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; +import org.junit.Before; import org.junit.Test; import java.util.Collections; @@ -44,80 +52,99 @@ public class StringFirstTimeseriesQueryTest { + private static final String VISITOR_ID = "visitor_id"; + private static final String CLIENT_TYPE = "client_type"; + private static final String FIRST_CLIENT_TYPE = "first_client_type"; - @Test - public void testTopNWithDistinctCountAgg() throws Exception - { - TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); + private static final DateTime TIME1 = DateTimes.of("2016-03-04T00:00:00.000Z"); + private static final DateTime TIME2 = DateTimes.of("2016-03-04T01:00:00.000Z"); - String visitor_id = "visitor_id"; - String client_type = "client_type"; + private IncrementalIndex incrementalIndex; + private QueryableIndex queryableIndex; + + @Before + public void setUp() throws IndexSizeExceededException + { + final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde(); + ComplexMetrics.registerSerde(serde.getTypeName(), serde); - IncrementalIndex index = new IncrementalIndex.Builder() + incrementalIndex = new IncrementalIndex.Builder() .setIndexSchema( new IncrementalIndexSchema.Builder() .withQueryGranularity(Granularities.SECOND) .withMetrics(new CountAggregatorFactory("cnt")) - .withMetrics(new StringFirstAggregatorFactory( - "last_client_type", "client_type", 1024) - ) + .withMetrics(new StringFirstAggregatorFactory(FIRST_CLIENT_TYPE, CLIENT_TYPE, 1024)) .build() ) .setMaxRowCount(1000) .buildOnheap(); - - DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z"); - long timestamp = time.getMillis(); - - DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z"); - long timestamp1 = time1.getMillis(); - index.add( + incrementalIndex.add( new MapBasedInputRow( - timestamp, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "0", client_type, "iphone") + TIME1, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "iphone") ) ); - index.add( + incrementalIndex.add( new MapBasedInputRow( - timestamp, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "1", client_type, "iphone") + TIME1, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "1", CLIENT_TYPE, "iphone") ) ); - index.add( + incrementalIndex.add( new MapBasedInputRow( - timestamp1, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "0", client_type, "android") + TIME2, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "android") ) ); + queryableIndex = TestIndex.persistRealtimeAndLoadMMapped(incrementalIndex); + } + + @Test + public void testTimeseriesQuery() + { + TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); + + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) .granularity(QueryRunnerTestHelper.ALL_GRAN) .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) .aggregators( - Collections.singletonList( - new StringFirstAggregatorFactory( - "last_client_type", client_type, 1024 - ) + ImmutableList.of( + new StringFirstAggregatorFactory("nonfolding", CLIENT_TYPE, 1024), + new StringFirstAggregatorFactory("folding", FIRST_CLIENT_TYPE, 1024), + new StringFirstAggregatorFactory("nonexistent", "nonexistent", 1024), + new StringFirstAggregatorFactory("numeric", "cnt", 1024) ) ) .build(); - final Iterable> results = - engine.process(query, new IncrementalIndexStorageAdapter(index)).toList(); - List> expectedResults = Collections.singletonList( new Result<>( - time, + TIME1, new TimeseriesResultValue( - ImmutableMap.of("last_client_type", new SerializablePairLongString(timestamp, "iphone")) + ImmutableMap.builder() + .put("nonfolding", new SerializablePairLongString(TIME1.getMillis(), "iphone")) + .put("folding", new SerializablePairLongString(TIME1.getMillis(), "iphone")) + .put("nonexistent", new SerializablePairLongString(DateTimes.MAX.getMillis(), null)) + .put("numeric", new SerializablePairLongString(DateTimes.MAX.getMillis(), null)) + .build() ) ) ); - TestHelper.assertExpectedResults(expectedResults, results); + + final Iterable> iiResults = + engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex)).toList(); + + final Iterable> qiResults = + engine.process(query, new QueryableIndexStorageAdapter(queryableIndex)).toList(); + + TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental index"); + TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable index"); } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java index c3de5989119c..18a378855a6f 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java @@ -27,7 +27,6 @@ import org.junit.Test; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; public class StringLastBufferAggregatorTest { @@ -65,13 +64,7 @@ public void testBufferAggregate() maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -109,13 +102,7 @@ public void testNullBufferAggregate() maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -132,8 +119,8 @@ public void testNullBufferAggregate() } - @Test(expected = IllegalStateException.class) - public void testNoStringValue() + @Test + public void testNonStringValue() { final long[] timestamps = {1526724000L, 1526724600L}; @@ -153,13 +140,7 @@ public void testNoStringValue() maxStringBytes ); - String testString = "ZZZZ"; - ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); - buf.putLong(1526728500L); - buf.putInt(testString.length()); - buf.put(testString.getBytes(StandardCharsets.UTF_8)); - int position = 0; agg.init(buf, position); @@ -167,5 +148,10 @@ public void testNoStringValue() for (int i = 0; i < timestamps.length; i++) { aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + Assert.assertEquals(1526724600L, (long) sp.lhs); + Assert.assertEquals("2.0", sp.rhs); } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java index 87ec83ce0e51..7765ec4335a8 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.aggregation.last; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.druid.data.input.MapBasedInputRow; @@ -29,14 +30,21 @@ import org.apache.druid.query.Result; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.SerializablePairLongString; +import org.apache.druid.query.aggregation.SerializablePairLongStringSerde; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter; +import org.apache.druid.segment.incremental.IndexSizeExceededException; +import org.apache.druid.segment.serde.ComplexMetrics; import org.joda.time.DateTime; +import org.junit.Before; import org.junit.Test; import java.util.Collections; @@ -44,83 +52,99 @@ public class StringLastTimeseriesQueryTest { + private static final String VISITOR_ID = "visitor_id"; + private static final String CLIENT_TYPE = "client_type"; + private static final String LAST_CLIENT_TYPE = "last_client_type"; - @Test - public void testTopNWithDistinctCountAgg() throws Exception - { - TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); + private static final DateTime TIME1 = DateTimes.of("2016-03-04T00:00:00.000Z"); + private static final DateTime TIME2 = DateTimes.of("2016-03-04T01:00:00.000Z"); - String visitor_id = "visitor_id"; - String client_type = "client_type"; + private IncrementalIndex incrementalIndex; + private QueryableIndex queryableIndex; + + @Before + public void setUp() throws IndexSizeExceededException + { + final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde(); + ComplexMetrics.registerSerde(serde.getTypeName(), serde); - IncrementalIndex index = new IncrementalIndex.Builder() + incrementalIndex = new IncrementalIndex.Builder() .setIndexSchema( new IncrementalIndexSchema.Builder() .withQueryGranularity(Granularities.SECOND) .withMetrics(new CountAggregatorFactory("cnt")) - .withMetrics(new StringLastAggregatorFactory( - "last_client_type", "client_type", 1024) - ) + .withMetrics(new StringLastAggregatorFactory(LAST_CLIENT_TYPE, CLIENT_TYPE, 1024)) .build() ) .setMaxRowCount(1000) .buildOnheap(); - - DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z"); - long timestamp = time.getMillis(); - - DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z"); - long timestamp1 = time1.getMillis(); - index.add( + incrementalIndex.add( new MapBasedInputRow( - timestamp, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "0", client_type, "iphone") + TIME1, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "iphone") ) ); - index.add( + incrementalIndex.add( new MapBasedInputRow( - timestamp, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "1", client_type, "iphone") + TIME1, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "1", CLIENT_TYPE, "iphone") ) ); - index.add( + incrementalIndex.add( new MapBasedInputRow( - timestamp1, - Lists.newArrayList(visitor_id, client_type), - ImmutableMap.of(visitor_id, "0", client_type, "android") + TIME2, + Lists.newArrayList(VISITOR_ID, CLIENT_TYPE), + ImmutableMap.of(VISITOR_ID, "0", CLIENT_TYPE, "android") ) ); + queryableIndex = TestIndex.persistRealtimeAndLoadMMapped(incrementalIndex); + } + + @Test + public void testTimeseriesQuery() + { + TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); + + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() .dataSource(QueryRunnerTestHelper.DATA_SOURCE) .granularity(QueryRunnerTestHelper.ALL_GRAN) .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC) .aggregators( - Collections.singletonList( - new StringLastAggregatorFactory( - "last_client_type", client_type, 1024 - ) + ImmutableList.of( + new StringLastAggregatorFactory("nonfolding", CLIENT_TYPE, 1024), + new StringLastAggregatorFactory("folding", LAST_CLIENT_TYPE, 1024), + new StringLastAggregatorFactory("nonexistent", "nonexistent", 1024), + new StringLastAggregatorFactory("numeric", "cnt", 1024) ) ) .build(); - final Iterable> results = - engine.process(query, new IncrementalIndexStorageAdapter(index)).toList(); - List> expectedResults = Collections.singletonList( new Result<>( - time, + TIME1, new TimeseriesResultValue( - ImmutableMap.of( - "last_client_type", - new SerializablePairLongString(timestamp1, "android") - ) + ImmutableMap.builder() + .put("nonfolding", new SerializablePairLongString(TIME2.getMillis(), "android")) + .put("folding", new SerializablePairLongString(TIME2.getMillis(), "android")) + .put("nonexistent", new SerializablePairLongString(DateTimes.MIN.getMillis(), null)) + .put("numeric", new SerializablePairLongString(DateTimes.MIN.getMillis(), null)) + .build() ) ) ); - TestHelper.assertExpectedResults(expectedResults, results); + + final Iterable> iiResults = + engine.process(query, new IncrementalIndexStorageAdapter(incrementalIndex)).toList(); + + final Iterable> qiResults = + engine.process(query, new QueryableIndexStorageAdapter(queryableIndex)).toList(); + + TestHelper.assertExpectedResults(expectedResults, iiResults, "incremental index"); + TestHelper.assertExpectedResults(expectedResults, qiResults, "queryable index"); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java index cf59d7ec4ef5..c461c7e774ac 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java @@ -540,7 +540,13 @@ public void testMultiColumnCacheStrategy() throws Exception ); // test timestamps that result in integer size millis - final ResultRow result1 = ResultRow.of(123L, "val1", "fooval1", 1, getIntermediateComplexValue(ValueType.STRING, "val1")); + final ResultRow result1 = ResultRow.of( + 123L, + "val1", + "fooval1", + 1, + getIntermediateComplexValue(ValueType.STRING, "val1") + ); Object preparedValue = strategy.prepareForSegmentLevelCache().apply(result1); diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java index aca6b749a1b1..fc889fd325ca 100644 --- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java @@ -150,8 +150,10 @@ public void testComputeCacheKeyWithDifferentPostAgg() ).getCacheStrategy(query2); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); - Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), - strategy2.computeResultLevelCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); } @Test @@ -234,8 +236,10 @@ public void testComputeResultLevelCacheKeyWithDifferentPostAgg() //segment level cache key excludes postaggregates in topn Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertFalse(Arrays.equals(strategy1.computeCacheKey(query1), strategy1.computeResultLevelCacheKey(query1))); - Assert.assertFalse(Arrays.equals(strategy1.computeResultLevelCacheKey(query1), - strategy2.computeResultLevelCacheKey(query2))); + Assert.assertFalse(Arrays.equals( + strategy1.computeResultLevelCacheKey(query1), + strategy2.computeResultLevelCacheKey(query2) + )); } @Test @@ -323,7 +327,8 @@ private HyperLogLogCollector getIntermediateHllCollector(final ValueType valueTy collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong((Long) dimValue).asBytes()); break; case DOUBLE: - collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits((Double) dimValue)).asBytes()); + collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits((Double) dimValue)) + .asBytes()); break; case FLOAT: collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits((Float) dimValue)).asBytes()); diff --git a/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java index 286b1b632d51..c3fb6aefc26b 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/ExpressionFilterTest.java @@ -188,7 +188,10 @@ public void testOneFloatColumn() public void testConstantExpression() { assertFilterMatchesSkipVectorize(edf("1 + 1"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")); + assertFilterMatchesSkipVectorize(edf("'true'"), ImmutableList.of("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")); + assertFilterMatchesSkipVectorize(edf("0 + 0"), ImmutableList.of()); + assertFilterMatchesSkipVectorize(edf("'false'"), ImmutableList.of()); } @Test