From f2eabcd4efb7ac4898cca365aeb3205fc73935e9 Mon Sep 17 00:00:00 2001 From: imply-cheddar Date: Thu, 17 Nov 2022 11:53:21 +0900 Subject: [PATCH] Persist legacy LatestPairs for now We added compression to the latest/first pair storage, but the code change was forcing new things to be persisted with the new format, meaning that any segment created with the new code cannot be read by the old code. Instead, we need to default to creating the old format and then remove that default in a future version. --- .../CompressedBigDecimalColumn.java | 7 +- ...ompressedBigDecimalColumnPartSupplier.java | 53 ++++--- .../CompressedBigDecimalColumnTest.java | 3 +- .../columnar/ComplexFrameColumnReader.java | 2 +- ...zablePairLongStringComplexMetricSerde.java | 101 ++++++++++++- .../druid/query/metadata/SegmentAnalyzer.java | 14 +- .../druid/segment/column/ComplexColumn.java | 2 +- .../GenericIndexedBasedComplexColumn.java | 2 +- .../column/UnknownTypeComplexColumn.java | 2 +- .../CompressedNestedDataComplexColumn.java | 2 +- ...ePairLongStringComplexMetricSerdeTest.java | 134 ++++++++++-------- ...a => SingleObjectColumnValueSelector.java} | 15 +- 12 files changed, 220 insertions(+), 117 deletions(-) rename processing/src/test/java/org/apache/druid/query/aggregation/{SingleValueColumnValueSelector.java => SingleObjectColumnValueSelector.java} (83%) diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java index b6d7d029be4e..a366675dc035 100644 --- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java +++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java @@ -30,7 +30,6 @@ import org.apache.druid.segment.data.ReadableOffset; import javax.annotation.Nullable; - import java.io.IOException; /** @@ -40,6 +39,7 @@ public class CompressedBigDecimalColumn implements ComplexColumn { public static final Logger LOGGER = new Logger(CompressedBigDecimalColumn.class); + private final int length; private final ColumnarInts scale; private final ColumnarMultiInts magnitude; @@ -49,8 +49,9 @@ public class CompressedBigDecimalColumn implements ComplexColumn * @param scale scale of the rows * @param magnitude LongColumn representing magnitudes */ - public CompressedBigDecimalColumn(ColumnarInts scale, ColumnarMultiInts magnitude) + public CompressedBigDecimalColumn(int length, ColumnarInts scale, ColumnarMultiInts magnitude) { + this.length = length; this.scale = scale; this.magnitude = magnitude; } @@ -87,7 +88,7 @@ public CompressedBigDecimal getRowValue(int rowNum) @Override public int getLength() { - return scale.size(); + return length; } @Override diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java index 0b58827e8a79..c51fbc3384e2 100644 --- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java +++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java @@ -26,6 +26,7 @@ import org.apache.druid.segment.column.ComplexColumn; import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier; import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSupplier; + import java.nio.ByteBuffer; /** @@ -33,27 +34,8 @@ */ public class CompressedBigDecimalColumnPartSupplier implements Supplier { - public static final int VERSION = 0x1; - private final CompressedVSizeColumnarIntsSupplier scaleSupplier; - private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier; - - /** - * Constructor. - * - * @param scaleSupplier scale supplier - * @param magnitudeSupplier supplied of results - */ - public CompressedBigDecimalColumnPartSupplier( - CompressedVSizeColumnarIntsSupplier scaleSupplier, - V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier - ) - { - this.scaleSupplier = scaleSupplier; - this.magnitudeSupplier = magnitudeSupplier; - } - /** * Compressed. * @@ -67,23 +49,50 @@ public static CompressedBigDecimalColumnPartSupplier fromByteBuffer( byte versionFromBuffer = buffer.get(); if (versionFromBuffer == VERSION) { + int positionStart = buffer.position(); CompressedVSizeColumnarIntsSupplier scaleSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( buffer, - IndexIO.BYTE_ORDER); + IndexIO.BYTE_ORDER + ); V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier = V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, IndexIO.BYTE_ORDER); - return new CompressedBigDecimalColumnPartSupplier(scaleSupplier, magnitudeSupplier); + return new CompressedBigDecimalColumnPartSupplier( + buffer.position() - positionStart, + scaleSupplier, + magnitudeSupplier + ); } else { throw new IAE("Unknown version[%s]", versionFromBuffer); } } + private final int byteSize; + private final CompressedVSizeColumnarIntsSupplier scaleSupplier; + private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier; + + /** + * Constructor. + * + * @param scaleSupplier scale supplier + * @param magnitudeSupplier supplied of results + */ + public CompressedBigDecimalColumnPartSupplier( + int byteSize, + CompressedVSizeColumnarIntsSupplier scaleSupplier, + V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier + ) + { + this.byteSize = byteSize; + this.scaleSupplier = scaleSupplier; + this.magnitudeSupplier = magnitudeSupplier; + } + @Override public ComplexColumn get() { - return new CompressedBigDecimalColumn(scaleSupplier.get(), magnitudeSupplier.get()); + return new CompressedBigDecimalColumn(byteSize, scaleSupplier.get(), magnitudeSupplier.get()); } } diff --git a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java index b779c6318021..d9a3f6e97141 100644 --- a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java +++ b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java @@ -35,6 +35,7 @@ public void testCompressedBigDecimalColumn() ColumnarInts columnarInts = EasyMock.createMock(ColumnarInts.class); ReadableOffset readableOffset = EasyMock.createMock(ReadableOffset.class); CompressedBigDecimalColumn compressedBigDecimalColumn = new CompressedBigDecimalColumn( + 12345, columnarInts, columnarMultiInts ); @@ -42,7 +43,7 @@ public void testCompressedBigDecimalColumn() CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL, compressedBigDecimalColumn.getTypeName() ); - Assert.assertEquals(0, compressedBigDecimalColumn.getLength()); + Assert.assertEquals(12345, compressedBigDecimalColumn.getLength()); Assert.assertEquals(CompressedBigDecimalColumn.class, compressedBigDecimalColumn.getClazz()); Assert.assertNotNull(compressedBigDecimalColumn.makeColumnValueSelector(readableOffset)); } diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java index 1f6396badd29..f7b662d42bb2 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java @@ -175,7 +175,7 @@ public Object getRowValue(int rowNum) @Override public int getLength() { - return frame.numRows(); + return (int) frame.numBytes(); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java index deb1623701c0..2cc60843f9e7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java @@ -21,6 +21,7 @@ import org.apache.druid.collections.SerializablePair; import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; import org.apache.druid.segment.data.GenericIndexed; @@ -28,6 +29,7 @@ import org.apache.druid.segment.serde.ComplexColumnPartSupplier; import org.apache.druid.segment.serde.ComplexMetricExtractor; import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; @@ -49,6 +51,20 @@ */ public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricSerde { + /** + * This is a configuration parameter to allow for turning on compression. It is a hack, it would be significantly + * better if this could be delivered via properties. The number one reason this is a hack is because it reads + * the System.getProperty which doesn't actually have runtime.properties files put into it, so this setting + * could be set in runtime.properties and this code wouldn't see it, because that's not how it is wired up. + * + * The intent of this parameter is so that Druid 25 can be released using the legacy serialization format. This + * will allow us to get code released that can *read* both the legacy and the new format. Then, in Druid 26, + * we can completely eliminate this boolean and start to only *write* the new format, in which case this + * hack of a configuration property disappears. + */ + private static final boolean COMPRESSION_ENABLED = + Boolean.parseBoolean(System.getProperty("druid.columns.pairLongString.compressed", "false")); + public static final int EXPECTED_VERSION = 3; public static final String TYPE_NAME = "serializablePairLongString"; // Null SerializablePairLongString values are put first @@ -60,6 +76,17 @@ public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricS private static final SerializablePairLongStringSimpleStagedSerde SERDE = new SerializablePairLongStringSimpleStagedSerde(); + private final boolean compressionEnabled; + + public SerializablePairLongStringComplexMetricSerde() + { + this(COMPRESSION_ENABLED); + } + + public SerializablePairLongStringComplexMetricSerde(boolean compressionEnabled) + { + this.compressionEnabled = compressionEnabled; + } @Override public String getTypeName() @@ -92,7 +119,7 @@ public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder) byte version = buffer.get(buffer.position()); if (version == 0 || version == 1 || version == 2) { - GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper()); + GenericIndexed column = GenericIndexed.read(buffer, LEGACY_STRATEGY, columnBuilder.getFileMapper()); columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column)); } else { SerializablePairLongStringComplexColumn.Builder builder = @@ -141,9 +168,73 @@ public byte[] toBytes(SerializablePairLongString val) @Override public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) { - return new SerializablePairLongStringColumnSerializer( - segmentWriteOutMedium, - NativeClearedByteBufferProvider.INSTANCE - ); + if (compressionEnabled) { + return new SerializablePairLongStringColumnSerializer( + segmentWriteOutMedium, + NativeClearedByteBufferProvider.INSTANCE + ); + } else { + return LargeColumnSupportedComplexColumnSerializer.create( + segmentWriteOutMedium, + column, + LEGACY_STRATEGY + ); + } } + + private static final ObjectStrategy LEGACY_STRATEGY = + new ObjectStrategy() + { + @Override + public int compare(@Nullable SerializablePairLongString o1, @Nullable SerializablePairLongString o2) + { + return COMPARATOR.compare(o1, o2); + } + + @Override + public Class getClazz() + { + return SerializablePairLongString.class; + } + + @Override + public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + + long lhs = readOnlyBuffer.getLong(); + int stringSize = readOnlyBuffer.getInt(); + + String lastString = null; + if (stringSize > 0) { + byte[] stringBytes = new byte[stringSize]; + readOnlyBuffer.get(stringBytes, 0, stringSize); + lastString = StringUtils.fromUtf8(stringBytes); + } + + return new SerializablePairLongString(lhs, lastString); + } + + @Override + public byte[] toBytes(SerializablePairLongString val) + { + String rhsString = val.rhs; + ByteBuffer bbuf; + + if (rhsString != null) { + byte[] rhsBytes = StringUtils.toUtf8(rhsString); + bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length); + bbuf.putLong(val.lhs); + bbuf.putInt(Long.BYTES, rhsBytes.length); + bbuf.position(Long.BYTES + Integer.BYTES); + bbuf.put(rhsBytes); + } else { + bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES); + bbuf.putLong(val.lhs); + bbuf.putInt(Long.BYTES, 0); + } + + return bbuf.array(); + } + }; } diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java index 88cc5dcb8fe5..041d8566d35a 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java @@ -96,7 +96,7 @@ public Map analyze(Segment segment) final StorageAdapter storageAdapter = segment.asStorageAdapter(); // get length and column names from storageAdapter - final int length = storageAdapter.getNumRows(); + final int numRows = storageAdapter.getNumRows(); // Use LinkedHashMap to preserve column order. final Map columns = new LinkedHashMap<>(); @@ -119,13 +119,13 @@ public Map analyze(Segment segment) final int bytesPerRow = ColumnHolder.TIME_COLUMN_NAME.equals(columnName) ? NUM_BYTES_IN_TIMESTAMP : Long.BYTES; - analysis = analyzeNumericColumn(capabilities, length, bytesPerRow); + analysis = analyzeNumericColumn(capabilities, numRows, bytesPerRow); break; case FLOAT: - analysis = analyzeNumericColumn(capabilities, length, NUM_BYTES_IN_TEXT_FLOAT); + analysis = analyzeNumericColumn(capabilities, numRows, NUM_BYTES_IN_TEXT_FLOAT); break; case DOUBLE: - analysis = analyzeNumericColumn(capabilities, length, Double.BYTES); + analysis = analyzeNumericColumn(capabilities, numRows, Double.BYTES); break; case STRING: if (index != null) { @@ -136,7 +136,7 @@ public Map analyze(Segment segment) break; case COMPLEX: final ColumnHolder columnHolder = index != null ? index.getColumnHolder(columnName) : null; - analysis = analyzeComplexColumn(capabilities, columnHolder); + analysis = analyzeComplexColumn(capabilities, numRows, columnHolder); break; default: log.warn("Unknown column type[%s].", capabilities.asTypeString()); @@ -330,6 +330,7 @@ public Long accumulate(Long accumulated, Cursor cursor) private ColumnAnalysis analyzeComplexColumn( @Nullable final ColumnCapabilities capabilities, + final int numCells, @Nullable final ColumnHolder columnHolder ) { @@ -362,8 +363,7 @@ private ColumnAnalysis analyzeComplexColumn( ); } - final int length = complexColumn.getLength(); - for (int i = 0; i < length; ++i) { + for (int i = 0; i < numCells; ++i) { size += inputSizeFn.apply(complexColumn.getRowValue(i)); } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java index 0122f1e53577..b0f3a4388c83 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java @@ -58,7 +58,7 @@ public interface ComplexColumn extends BaseColumn Object getRowValue(int rowNum); /** - * @return serialized size (in bytes) of this column. + * @return serialized size (in bytes) of this column. -1 for unknown */ int getLength(); diff --git a/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java index f3ec1341173a..4b1654d88843 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java @@ -58,7 +58,7 @@ public Object getRowValue(int rowNum) @Override public int getLength() { - return index.size(); + return -1; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java index ce4db70438ef..3f11b779eaf7 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java @@ -59,7 +59,7 @@ public Object getRowValue(int rowNum) @Override public int getLength() { - return 0; + return -1; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java index e062d1e94ce2..c4682d20711b 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java @@ -281,7 +281,7 @@ public int getMaxVectorSize() @Override public int getLength() { - return 0; + return -1; } @Override diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java index c8605f4695b8..f503874a4d6b 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java @@ -20,55 +20,50 @@ package org.apache.druid.query.aggregation; import com.google.common.collect.ImmutableList; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ComplexColumn; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.serde.cell.RandomStringUtils; import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes; import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Random; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicReference; public class SerializablePairLongStringComplexMetricSerdeTest { - private static final SerializablePairLongStringComplexMetricSerde COMPLEX_METRIC_SERDE = + static { + NullHandling.initializeForTests(); + } + + private static final SerializablePairLongStringComplexMetricSerde LEGACY_SERDE = new SerializablePairLongStringComplexMetricSerde(); + private static final SerializablePairLongStringComplexMetricSerde COMPRESSED_SERDE = + new SerializablePairLongStringComplexMetricSerde(true); // want deterministic test input private final Random random = new Random(0); private final RandomStringUtils randomStringUtils = new RandomStringUtils(random); - private GenericColumnSerializer serializer; - - @SuppressWarnings("unchecked") - @Before - public void setup() - { - SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium(); - serializer = (GenericColumnSerializer) COMPLEX_METRIC_SERDE.getSerializer( - writeOutMedium, - "not-used" - ); - } - @Test public void testSingle() throws Exception { - assertExpected(ImmutableList.of(new SerializablePairLongString(100L, "fuu")), 77); + assertExpected(ImmutableList.of(new SerializablePairLongString(100L, "fuu")), 33, 77); } @Test @@ -78,7 +73,7 @@ public void testLargeString() throws Exception assertExpected(ImmutableList.of(new SerializablePairLongString( 100L, randomStringUtils.randomAlphanumeric(2 * 1024 * 1024) - )), 2103140); + )), 2097182, 2103140); } @Test @@ -95,8 +90,7 @@ public void testCompressable() throws Exception valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, stringList.get(i % numStrings))); } - //actual input bytes in naive encoding is ~10mb - assertExpected(valueList, 1746026); + assertExpected(valueList, 10440010, 1746026); } @Test @@ -109,8 +103,7 @@ public void testHighlyCompressable() throws Exception valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, stringValue)); } - //actual input bytes in naive encoding is ~10mb - assertExpected(valueList, 289645); + assertExpected(valueList, 10440010, 289645); } @Test @@ -122,81 +115,109 @@ public void testRandom() throws Exception valueList.add(new SerializablePairLongString(random.nextLong(), randomStringUtils.randomAlphanumeric(1024))); } - assertExpected(valueList, 10428975); + assertExpected(valueList, 10440010, 10428975); } @Test public void testNullString() throws Exception { - assertExpected(ImmutableList.of(new SerializablePairLongString(100L, null)), 74); + assertExpected(ImmutableList.of(new SerializablePairLongString(100L, null)), 30, 74); } @Test public void testEmpty() throws Exception { // minimum size for empty data - assertExpected(Collections.emptyList(), 57); + assertExpected(Collections.emptyList(), 10, 57); } @Test public void testSingleNull() throws Exception { - assertExpected(Arrays.asList(new SerializablePairLongString[]{null}), 58); + assertExpected(Arrays.asList(new SerializablePairLongString[]{null}), 18, 58); } @Test public void testMultipleNull() throws Exception { - assertExpected(Arrays.asList(null, null, null, null), 59); - } - - private void assertExpected(List expected) throws IOException - { - assertExpected(expected, -1); + assertExpected(Arrays.asList(null, null, null, null), 42, 59); } - private void assertExpected(List expected, int expectedSize) throws IOException + private ByteBuffer assertExpected( + List expected, + int expectedLegacySize, + int expectedCompressedSize + ) throws IOException { - List valueSelectors = - expected.stream().map(SerializablePairLongStringValueSelector::new).collect(Collectors.toList()); - ByteBuffer byteBuffer = serializeAllValuesToByteBuffer(valueSelectors, serializer, expectedSize); - - try (SerializablePairLongStringComplexColumn complexColumn = createComplexColumn(byteBuffer)) { - for (int i = 0; i < valueSelectors.size(); i++) { - Assert.assertEquals(expected.get(i), complexColumn.getRowValue(i)); + SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium(); + ByteBuffer legacyBuffer = serializeAllValuesToByteBuffer( + expected, + LEGACY_SERDE.getSerializer(writeOutMedium, "not-used"), + expectedLegacySize + ).asReadOnlyBuffer(); + ByteBuffer compressedBuffer = serializeAllValuesToByteBuffer( + expected, + COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used"), + expectedCompressedSize + ).asReadOnlyBuffer(); + + try (ComplexColumn legacyCol = createComplexColumn(legacyBuffer); + ComplexColumn compressedCol = createComplexColumn(compressedBuffer) + ) { + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals(expected.get(i), legacyCol.getRowValue(i)); + Assert.assertEquals(expected.get(i), compressedCol.getRowValue(i)); } } + return compressedBuffer; } - private SerializablePairLongStringComplexColumn createComplexColumn(ByteBuffer byteBuffer) + private ComplexColumn createComplexColumn(ByteBuffer byteBuffer) { ColumnBuilder builder = new ColumnBuilder(); int serializedSize = byteBuffer.remaining(); - COMPLEX_METRIC_SERDE.deserializeColumn(byteBuffer, builder); + LEGACY_SERDE.deserializeColumn(byteBuffer, builder); builder.setType(ValueType.COMPLEX); ColumnHolder columnHolder = builder.build(); - SerializablePairLongStringComplexColumn column = (SerializablePairLongStringComplexColumn) columnHolder.getColumn(); - - Assert.assertEquals(serializedSize, column.getLength()); - Assert.assertEquals("serializablePairLongString", column.getTypeName()); - Assert.assertEquals(SerializablePairLongString.class, column.getClazz()); + final ComplexColumn col = (ComplexColumn) columnHolder.getColumn(); + if (col instanceof SerializablePairLongStringComplexColumn) { + Assert.assertEquals(serializedSize, col.getLength()); + } + Assert.assertEquals("serializablePairLongString", col.getTypeName()); + Assert.assertEquals(SerializablePairLongString.class, col.getClazz()); - return column; + return col; } + @SuppressWarnings({"unchecked", "rawtypes"}) private static ByteBuffer serializeAllValuesToByteBuffer( - Collection valueSelectors, - GenericColumnSerializer serializer, + List values, + GenericColumnSerializer serializer, int expectedSize ) throws IOException { serializer.open(); - for (SerializablePairLongStringValueSelector valueSelector : valueSelectors) { + final AtomicReference reference = new AtomicReference<>(null); + ColumnValueSelector valueSelector = + new SingleObjectColumnValueSelector( + SerializablePairLongString.class + ) + { + @Nullable + @Override + public SerializablePairLongString getObject() + { + return reference.get(); + } + }; + + for (SerializablePairLongString selector : values) { + reference.set(selector); serializer.serialize(valueSelector); } @@ -225,13 +246,4 @@ private static ByteBuffer serializeToByteBuffer( return byteBuffer; } - - private static class SerializablePairLongStringValueSelector - extends SingleValueColumnValueSelector - { - public SerializablePairLongStringValueSelector(SerializablePairLongString value) - { - super(SerializablePairLongString.class, value); - } - } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java b/processing/src/test/java/org/apache/druid/query/aggregation/SingleObjectColumnValueSelector.java similarity index 83% rename from processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java rename to processing/src/test/java/org/apache/druid/query/aggregation/SingleObjectColumnValueSelector.java index 8c9d232ca9d7..5869a6b871c1 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/SingleObjectColumnValueSelector.java @@ -22,17 +22,13 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; -import javax.annotation.Nullable; - -public class SingleValueColumnValueSelector implements ColumnValueSelector +public abstract class SingleObjectColumnValueSelector implements ColumnValueSelector { private final Class valueClass; - private final T value; - public SingleValueColumnValueSelector(Class valueClass, T value) + public SingleObjectColumnValueSelector(Class valueClass) { this.valueClass = valueClass; - this.value = value; } @Override @@ -64,13 +60,6 @@ public boolean isNull() return false; } - @Nullable - @Override - public T getObject() - { - return value; - } - @Override public Class classOfObject() {