From 26592a33c08421dc8672beac17386d8823818336 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 16 Jan 2020 21:02:02 -0800 Subject: [PATCH 1/3] Speed up String first/last aggregators when folding isn't needed. (#9181) * Speed up String first/last aggregators when folding isn't needed. Examines the value column, and disables fold checking via a needsFoldCheck flag if that column can't possibly contain SerializableLongStringPairs. This is helpful because it avoids calling getObject on the value selector when unnecessary; say, because the time selector didn't yield an earlier or later value. * PR comments. * Move fastLooseChop to StringUtils. --- .../druid/java/util/common/StringUtils.java | 35 ++++++++++++ .../java/util/common/StringUtilsTest.java | 28 ++++++++++ .../first/StringFirstAggregator.java | 44 +++++++++------ .../first/StringFirstAggregatorFactory.java | 13 +++-- .../first/StringFirstBufferAggregator.java | 54 +++++++++++++------ .../first/StringFirstLastUtils.java | 29 +++++++++- .../last/StringLastAggregator.java | 44 +++++++++------ .../last/StringLastAggregatorFactory.java | 14 +++-- .../last/StringLastBufferAggregator.java | 54 +++++++++++++------ .../first/StringFirstAggregationTest.java | 8 ++- .../StringFirstBufferAggregatorTest.java | 46 ++++++++++++++-- .../last/StringLastAggregationTest.java | 5 ++ .../last/StringLastBufferAggregatorTest.java | 50 +++++++++++++++-- 13 files changed, 341 insertions(+), 83 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java index 7485802ff36a..33a4e3c74d58 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -546,4 +546,39 @@ public static String rpad(String base, Integer len, String pad) return new String(data); } + /** + * Returns the string truncated to maxBytes. + * If given string input is shorter than maxBytes, then it remains the same. + * + * @param s The input string to possibly be truncated + * @param maxBytes The max bytes that string input will be truncated to + * + * @return the string after truncated to maxBytes + */ + @Nullable + public static String chop(@Nullable final String s, final int maxBytes) + { + if (s == null) { + return null; + } else { + // Shorten firstValue to what could fit in maxBytes as UTF-8. + final byte[] bytes = new byte[maxBytes]; + final int len = StringUtils.toUtf8WithLimit(s, ByteBuffer.wrap(bytes)); + return new String(bytes, 0, len, StandardCharsets.UTF_8); + } + } + + /** + * Shorten "s" to "maxBytes" chars. Fast and loose because these are *chars* not *bytes*. Use + * {@link #chop(String, int)} for slower, but accurate chopping. + */ + @Nullable + public static String fastLooseChop(@Nullable final String s, final int maxBytes) + { + if (s == null || s.length() <= maxBytes) { + return s; + } else { + return s.substring(0, maxBytes); + } + } } diff --git a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java index e9f5f214b08b..1d4cb6dbade9 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java @@ -246,4 +246,32 @@ public void testRpad() Assert.assertEquals(s5, null); } + @Test + public void testChop() + { + Assert.assertEquals("foo", StringUtils.chop("foo", 5)); + Assert.assertEquals("fo", StringUtils.chop("foo", 2)); + Assert.assertEquals("", StringUtils.chop("foo", 0)); + Assert.assertEquals("smile 🙂 for", StringUtils.chop("smile 🙂 for the camera", 14)); + Assert.assertEquals("smile 🙂", StringUtils.chop("smile 🙂 for the camera", 10)); + Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 9)); + Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 8)); + Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 7)); + Assert.assertEquals("smile ", StringUtils.chop("smile 🙂 for the camera", 6)); + Assert.assertEquals("smile", StringUtils.chop("smile 🙂 for the camera", 5)); + } + + @Test + public void testFastLooseChop() + { + Assert.assertEquals("foo", StringUtils.fastLooseChop("foo", 5)); + Assert.assertEquals("fo", StringUtils.fastLooseChop("foo", 2)); + Assert.assertEquals("", StringUtils.fastLooseChop("foo", 0)); + Assert.assertEquals("smile 🙂 for", StringUtils.fastLooseChop("smile 🙂 for the camera", 12)); + Assert.assertEquals("smile 🙂 ", StringUtils.fastLooseChop("smile 🙂 for the camera", 9)); + Assert.assertEquals("smile 🙂", StringUtils.fastLooseChop("smile 🙂 for the camera", 8)); + Assert.assertEquals("smile \uD83D", StringUtils.fastLooseChop("smile 🙂 for the camera", 7)); + Assert.assertEquals("smile ", StringUtils.fastLooseChop("smile 🙂 for the camera", 6)); + Assert.assertEquals("smile", StringUtils.fastLooseChop("smile 🙂 for the camera", 5)); + } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java index 02600444f67c..f4a3a2d35255 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java @@ -24,28 +24,29 @@ import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; - -import javax.annotation.Nullable; +import org.apache.druid.segment.DimensionHandlerUtils; public class StringFirstAggregator implements Aggregator { - @Nullable private final BaseLongColumnValueSelector timeSelector; - private final BaseObjectColumnValueSelector valueSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; + private final boolean needsFoldCheck; protected long firstTime; protected String firstValue; public StringFirstAggregator( - @Nullable BaseLongColumnValueSelector timeSelector, - BaseObjectColumnValueSelector valueSelector, - int maxStringBytes + BaseLongColumnValueSelector timeSelector, + BaseObjectColumnValueSelector valueSelector, + int maxStringBytes, + boolean needsFoldCheck ) { this.valueSelector = valueSelector; this.timeSelector = timeSelector; this.maxStringBytes = maxStringBytes; + this.needsFoldCheck = needsFoldCheck; firstTime = DateTimes.MAX.getMillis(); firstValue = null; @@ -54,17 +55,28 @@ public StringFirstAggregator( @Override public void aggregate() { - final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( - timeSelector, - valueSelector - ); + if (needsFoldCheck) { + // Less efficient code path when folding is a possibility (we must read the value selector first just in case + // it's a foldable object). + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); + + if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) { + firstTime = inPair.lhs; + firstValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes); + } + } else { + final long time = timeSelector.getLong(); - if (inPair != null && inPair.rhs != null && inPair.lhs < firstTime) { - firstTime = inPair.lhs; - firstValue = inPair.rhs; + if (time < firstTime) { + final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject()); - if (firstValue.length() > maxStringBytes) { - firstValue = firstValue.substring(0, maxStringBytes); + if (value != null) { + firstTime = time; + firstValue = StringUtils.fastLooseChop(value, maxStringBytes); + } } } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java index 1b30bf716fe2..6ad8558ef0bd 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java @@ -32,6 +32,7 @@ import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnHolder; @@ -118,20 +119,24 @@ public StringFirstAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { + final BaseObjectColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); return new StringFirstAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - metricFactory.makeColumnValueSelector(fieldName), - maxStringBytes + valueSelector, + maxStringBytes, + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) ); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { + final BaseObjectColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); return new StringFirstBufferAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - metricFactory.makeColumnValueSelector(fieldName), - maxStringBytes + valueSelector, + maxStringBytes, + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) ); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java index 5a4c00686e21..b7d5ac89b06a 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java @@ -25,6 +25,7 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.DimensionHandlerUtils; import java.nio.ByteBuffer; @@ -36,18 +37,21 @@ public class StringFirstBufferAggregator implements BufferAggregator ); private final BaseLongColumnValueSelector timeSelector; - private final BaseObjectColumnValueSelector valueSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; + private final boolean needsFoldCheck; public StringFirstBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseObjectColumnValueSelector valueSelector, - int maxStringBytes + BaseObjectColumnValueSelector valueSelector, + int maxStringBytes, + boolean needsFoldCheck ) { this.timeSelector = timeSelector; this.valueSelector = valueSelector; this.maxStringBytes = maxStringBytes; + this.needsFoldCheck = needsFoldCheck; } @Override @@ -59,20 +63,40 @@ public void init(ByteBuffer buf, int position) @Override public void aggregate(ByteBuffer buf, int position) { - final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( - timeSelector, - valueSelector - ); + if (needsFoldCheck) { + // Less efficient code path when folding is a possibility (we must read the value selector first just in case + // it's a foldable object). + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); - if (inPair != null && inPair.rhs != null) { + if (inPair != null && inPair.rhs != null) { + final long firstTime = buf.getLong(position); + if (inPair.lhs < firstTime) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(inPair.lhs, inPair.rhs), + maxStringBytes + ); + } + } + } else { + final long time = timeSelector.getLong(); final long firstTime = buf.getLong(position); - if (inPair.lhs < firstTime) { - StringFirstLastUtils.writePair( - buf, - position, - new SerializablePairLongString(inPair.lhs, inPair.rhs), - maxStringBytes - ); + + if (time < firstTime) { + final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject()); + + if (value != null) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(time, value), + maxStringBytes + ); + } } } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 133c4ba1ffcc..7d2cb1dd25a5 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -24,6 +24,9 @@ import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.DimensionHandlerUtils; +import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -33,6 +36,30 @@ public class StringFirstLastUtils { private static final int NULL_VALUE = -1; + /** + * Returns whether a given value selector *might* contain SerializablePairLongString objects. + */ + public static boolean selectorNeedsFoldCheck( + final BaseObjectColumnValueSelector valueSelector, + @Nullable final ColumnCapabilities valueSelectorCapabilities + ) + { + if (valueSelectorCapabilities != null && valueSelectorCapabilities.getType() != ValueType.COMPLEX) { + // Known, non-complex type. + return false; + } + + if (valueSelector instanceof NilColumnValueSelector) { + // Nil column, definitely no SerializablePairLongStrings. + return false; + } + + // Check if the selector class could possibly be a SerializablePairLongString (either a superclass or subclass). + final Class clazz = valueSelector.classOfObject(); + return clazz.isAssignableFrom(SerializablePairLongString.class) + || SerializablePairLongString.class.isAssignableFrom(clazz); + } + @Nullable public static String chop(@Nullable final String s, final int maxBytes) { @@ -49,7 +76,7 @@ public static String chop(@Nullable final String s, final int maxBytes) @Nullable public static SerializablePairLongString readPairFromSelectors( final BaseLongColumnValueSelector timeSelector, - final BaseObjectColumnValueSelector valueSelector + final BaseObjectColumnValueSelector valueSelector ) { final long time; diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java index 01be5db49927..34e4179b2dac 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java @@ -25,25 +25,29 @@ import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.DimensionHandlerUtils; public class StringLastAggregator implements Aggregator { private final BaseLongColumnValueSelector timeSelector; - private final BaseObjectColumnValueSelector valueSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; + private final boolean needsFoldCheck; protected long lastTime; protected String lastValue; public StringLastAggregator( - BaseLongColumnValueSelector timeSelector, - BaseObjectColumnValueSelector valueSelector, - int maxStringBytes + final BaseLongColumnValueSelector timeSelector, + final BaseObjectColumnValueSelector valueSelector, + final int maxStringBytes, + final boolean needsFoldCheck ) { this.valueSelector = valueSelector; this.timeSelector = timeSelector; this.maxStringBytes = maxStringBytes; + this.needsFoldCheck = needsFoldCheck; lastTime = DateTimes.MIN.getMillis(); lastValue = null; @@ -52,22 +56,28 @@ public StringLastAggregator( @Override public void aggregate() { - final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( - timeSelector, - valueSelector - ); + if (needsFoldCheck) { + // Less efficient code path when folding is a possibility (we must read the value selector first just in case + // it's a foldable object). + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); - if (inPair == null) { - // Don't aggregate nulls. - return; - } + if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) { + lastTime = inPair.lhs; + lastValue = StringUtils.fastLooseChop(inPair.rhs, maxStringBytes); + } + } else { + final long time = timeSelector.getLong(); - if (inPair != null && inPair.rhs != null && inPair.lhs >= lastTime) { - lastTime = inPair.lhs; - lastValue = inPair.rhs; + if (time >= lastTime) { + final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject()); - if (lastValue.length() > maxStringBytes) { - lastValue = lastValue.substring(0, maxStringBytes); + if (value != null) { + lastTime = time; + lastValue = StringUtils.fastLooseChop(value, maxStringBytes); + } } } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java index 9277d0529dd7..9a3264f1fbae 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java @@ -32,7 +32,9 @@ import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory; +import org.apache.druid.query.aggregation.first.StringFirstLastUtils; import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.BaseObjectColumnValueSelector; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnHolder; @@ -74,20 +76,24 @@ public StringLastAggregatorFactory( @Override public Aggregator factorize(ColumnSelectorFactory metricFactory) { + final BaseObjectColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); return new StringLastAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - metricFactory.makeColumnValueSelector(fieldName), - maxStringBytes + valueSelector, + maxStringBytes, + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) ); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) { + final BaseObjectColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(fieldName); return new StringLastBufferAggregator( metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME), - metricFactory.makeColumnValueSelector(fieldName), - maxStringBytes + valueSelector, + maxStringBytes, + StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName)) ); } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java index 30ea4289939a..09e3276d3d56 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java @@ -26,6 +26,7 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.BaseLongColumnValueSelector; import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.DimensionHandlerUtils; import java.nio.ByteBuffer; @@ -37,18 +38,21 @@ public class StringLastBufferAggregator implements BufferAggregator ); private final BaseLongColumnValueSelector timeSelector; - private final BaseObjectColumnValueSelector valueSelector; + private final BaseObjectColumnValueSelector valueSelector; private final int maxStringBytes; + private final boolean needsFoldCheck; public StringLastBufferAggregator( BaseLongColumnValueSelector timeSelector, - BaseObjectColumnValueSelector valueSelector, - int maxStringBytes + BaseObjectColumnValueSelector valueSelector, + int maxStringBytes, + boolean needsFoldCheck ) { this.timeSelector = timeSelector; this.valueSelector = valueSelector; this.maxStringBytes = maxStringBytes; + this.needsFoldCheck = needsFoldCheck; } @Override @@ -60,20 +64,40 @@ public void init(ByteBuffer buf, int position) @Override public void aggregate(ByteBuffer buf, int position) { - final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( - timeSelector, - valueSelector - ); + if (needsFoldCheck) { + // Less efficient code path when folding is a possibility (we must read the value selector first just in case + // it's a foldable object). + final SerializablePairLongString inPair = StringFirstLastUtils.readPairFromSelectors( + timeSelector, + valueSelector + ); - if (inPair != null && inPair.rhs != null) { + if (inPair != null && inPair.rhs != null) { + final long lastTime = buf.getLong(position); + if (inPair.lhs >= lastTime) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(inPair.lhs, inPair.rhs), + maxStringBytes + ); + } + } + } else { + final long time = timeSelector.getLong(); final long lastTime = buf.getLong(position); - if (inPair.lhs >= lastTime) { - StringFirstLastUtils.writePair( - buf, - position, - new SerializablePairLongString(inPair.lhs, inPair.rhs), - maxStringBytes - ); + + if (time >= lastTime) { + final String value = DimensionHandlerUtils.convertObjectToString(valueSelector.getObject()); + + if (value != null) { + StringFirstLastUtils.writePair( + buf, + position, + new SerializablePairLongString(time, value), + maxStringBytes + ); + } } } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java index 190c3435885b..0e450d32c37d 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java @@ -28,7 +28,9 @@ import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; @@ -68,6 +70,9 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")) + .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING)); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null); EasyMock.replay(colSelectorFactory); } @@ -133,8 +138,7 @@ public void testStringFirstCombiningAggregator() @Test public void testStringFirstCombiningBufferAggregator() { - BufferAggregator agg = combiningAggFactory.factorizeBuffered( - colSelectorFactory); + BufferAggregator agg = combiningAggFactory.factorizeBuffered(colSelectorFactory); ByteBuffer buffer = ByteBuffer.wrap(new byte[stringFirstAggFactory.getMaxIntermediateSize()]); agg.init(buffer, 0); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java index 04700fae42df..3b4ef692bf52 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java @@ -46,7 +46,6 @@ private void aggregateBuffer( @Test public void testBufferAggregate() { - final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L}; final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; Integer maxStringBytes = 1024; @@ -61,7 +60,8 @@ public void testBufferAggregate() StringFirstBufferAggregator agg = new StringFirstBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); @@ -78,7 +78,43 @@ public void testBufferAggregate() Assert.assertEquals("expected last string value", strings[0], sp.rhs); Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs)); + } + + @Test + public void testBufferAggregateWithFoldCheck() + { + final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L}; + final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; + Integer maxStringBytes = 1024; + + TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps); + TestObjectColumnSelector objectColumnSelector = new TestObjectColumnSelector<>(strings); + + StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory( + "billy", "billy", maxStringBytes + ); + + StringFirstBufferAggregator agg = new StringFirstBufferAggregator( + longColumnSelector, + objectColumnSelector, + maxStringBytes, + true + ); + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + int position = 0; + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < timestamps.length; i++) { + aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); + } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + + Assert.assertEquals("expected last string value", strings[0], sp.rhs); + Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs)); } @Test @@ -99,7 +135,8 @@ public void testNullBufferAggregate() StringFirstBufferAggregator agg = new StringFirstBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); @@ -137,7 +174,8 @@ public void testNoStringValue() StringFirstBufferAggregator agg = new StringFirstBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java index 2e22f9cfcf7c..39f9925606ed 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java @@ -28,7 +28,9 @@ import org.apache.druid.query.aggregation.TestLongColumnSelector; import org.apache.druid.query.aggregation.TestObjectColumnSelector; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ValueType; import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; @@ -68,6 +70,9 @@ public void setup() EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector); EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly")) + .andReturn(new ColumnCapabilitiesImpl().setType(ValueType.STRING)); + EasyMock.expect(colSelectorFactory.getColumnCapabilities("billy")).andReturn(null); EasyMock.replay(colSelectorFactory); } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java index 18a378855a6f..6c350c4cff1e 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java @@ -61,7 +61,8 @@ public void testBufferAggregate() StringLastBufferAggregator agg = new StringLastBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); @@ -76,11 +77,48 @@ public void testBufferAggregate() SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); - Assert.assertEquals("expectec last string value", "DDDD", sp.rhs); + Assert.assertEquals("expected last string value", "DDDD", sp.rhs); Assert.assertEquals("last string timestamp is the biggest", new Long(1526725900L), new Long(sp.lhs)); } + @Test + public void testBufferAggregateWithFoldCheck() + { + final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L}; + final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"}; + Integer maxStringBytes = 1024; + + TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps); + TestObjectColumnSelector objectColumnSelector = new TestObjectColumnSelector<>(strings); + + StringLastAggregatorFactory factory = new StringLastAggregatorFactory( + "billy", "billy", maxStringBytes + ); + + StringLastBufferAggregator agg = new StringLastBufferAggregator( + longColumnSelector, + objectColumnSelector, + maxStringBytes, + true + ); + + ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); + int position = 0; + + agg.init(buf, position); + //noinspection ForLoopReplaceableByForEach + for (int i = 0; i < timestamps.length; i++) { + aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position); + } + + SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); + + + Assert.assertEquals("expected last string value", "DDDD", sp.rhs); + Assert.assertEquals("last string timestamp is the biggest", new Long(1526725900L), new Long(sp.lhs)); + } + @Test public void testNullBufferAggregate() { @@ -99,7 +137,8 @@ public void testNullBufferAggregate() StringLastBufferAggregator agg = new StringLastBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); @@ -114,7 +153,7 @@ public void testNullBufferAggregate() SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position)); - Assert.assertEquals("expectec last string value", strings[2], sp.rhs); + Assert.assertEquals("expected last string value", strings[2], sp.rhs); Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[2]), new Long(sp.lhs)); } @@ -137,7 +176,8 @@ public void testNonStringValue() StringLastBufferAggregator agg = new StringLastBufferAggregator( longColumnSelector, objectColumnSelector, - maxStringBytes + maxStringBytes, + false ); ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize()); From 274e6856ff06a08cb2c91a3cf476a5f638d29ffc Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Fri, 17 Jan 2020 16:40:45 -0800 Subject: [PATCH 2/3] actually fix conflict correctly --- .../aggregation/first/StringFirstAggregator.java | 3 ++- .../aggregation/first/StringFirstLastUtils.java | 13 ------------- .../aggregation/last/StringLastAggregator.java | 3 ++- 3 files changed, 4 insertions(+), 15 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java index f4a3a2d35255..2d5ee990ed1f 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java @@ -20,6 +20,7 @@ package org.apache.druid.query.aggregation.first; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.segment.BaseLongColumnValueSelector; @@ -84,7 +85,7 @@ public void aggregate() @Override public Object get() { - return new SerializablePairLongString(firstTime, StringFirstLastUtils.chop(firstValue, maxStringBytes)); + return new SerializablePairLongString(firstTime, StringUtils.chop(firstValue, maxStringBytes)); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 7d2cb1dd25a5..517feb788cb9 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -60,19 +60,6 @@ public static boolean selectorNeedsFoldCheck( || SerializablePairLongString.class.isAssignableFrom(clazz); } - @Nullable - public static String chop(@Nullable final String s, final int maxBytes) - { - if (s == null) { - return null; - } else { - // Shorten firstValue to what could fit in maxBytes as UTF-8. - final byte[] bytes = new byte[maxBytes]; - final int len = StringUtils.toUtf8WithLimit(s, ByteBuffer.wrap(bytes)); - return new String(bytes, 0, len, StandardCharsets.UTF_8); - } - } - @Nullable public static SerializablePairLongString readPairFromSelectors( final BaseLongColumnValueSelector timeSelector, diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java index 34e4179b2dac..ea37ff420408 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java @@ -20,6 +20,7 @@ package org.apache.druid.query.aggregation.last; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.SerializablePairLongString; import org.apache.druid.query.aggregation.first.StringFirstLastUtils; @@ -85,7 +86,7 @@ public void aggregate() @Override public Object get() { - return new SerializablePairLongString(lastTime, StringFirstLastUtils.chop(lastValue, maxStringBytes)); + return new SerializablePairLongString(lastTime, StringUtils.chop(lastValue, maxStringBytes)); } @Override From 3bd24813cac93d1ec31e834821cbb42a39f5807c Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Fri, 17 Jan 2020 21:38:38 -0800 Subject: [PATCH 3/3] remove unused import --- .../druid/query/aggregation/first/StringFirstLastUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java index 517feb788cb9..b26877e776ca 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstLastUtils.java @@ -30,7 +30,6 @@ import javax.annotation.Nullable; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; public class StringFirstLastUtils {