diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java index b6d7d029be4e..a366675dc035 100644 --- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java +++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumn.java @@ -30,7 +30,6 @@ import org.apache.druid.segment.data.ReadableOffset; import javax.annotation.Nullable; - import java.io.IOException; /** @@ -40,6 +39,7 @@ public class CompressedBigDecimalColumn implements ComplexColumn { public static final Logger LOGGER = new Logger(CompressedBigDecimalColumn.class); + private final int length; private final ColumnarInts scale; private final ColumnarMultiInts magnitude; @@ -49,8 +49,9 @@ public class CompressedBigDecimalColumn implements ComplexColumn * @param scale scale of the rows * @param magnitude LongColumn representing magnitudes */ - public CompressedBigDecimalColumn(ColumnarInts scale, ColumnarMultiInts magnitude) + public CompressedBigDecimalColumn(int length, ColumnarInts scale, ColumnarMultiInts magnitude) { + this.length = length; this.scale = scale; this.magnitude = magnitude; } @@ -87,7 +88,7 @@ public CompressedBigDecimal getRowValue(int rowNum) @Override public int getLength() { - return scale.size(); + return length; } @Override diff --git a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java index 0b58827e8a79..c51fbc3384e2 100644 --- a/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java +++ b/extensions-contrib/compressed-bigdecimal/src/main/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnPartSupplier.java @@ -26,6 +26,7 @@ import org.apache.druid.segment.column.ComplexColumn; import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSupplier; import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSupplier; + import java.nio.ByteBuffer; /** @@ -33,27 +34,8 @@ */ public class CompressedBigDecimalColumnPartSupplier implements Supplier { - public static final int VERSION = 0x1; - private final CompressedVSizeColumnarIntsSupplier scaleSupplier; - private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier; - - /** - * Constructor. - * - * @param scaleSupplier scale supplier - * @param magnitudeSupplier supplied of results - */ - public CompressedBigDecimalColumnPartSupplier( - CompressedVSizeColumnarIntsSupplier scaleSupplier, - V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier - ) - { - this.scaleSupplier = scaleSupplier; - this.magnitudeSupplier = magnitudeSupplier; - } - /** * Compressed. * @@ -67,23 +49,50 @@ public static CompressedBigDecimalColumnPartSupplier fromByteBuffer( byte versionFromBuffer = buffer.get(); if (versionFromBuffer == VERSION) { + int positionStart = buffer.position(); CompressedVSizeColumnarIntsSupplier scaleSupplier = CompressedVSizeColumnarIntsSupplier.fromByteBuffer( buffer, - IndexIO.BYTE_ORDER); + IndexIO.BYTE_ORDER + ); V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier = V3CompressedVSizeColumnarMultiIntsSupplier.fromByteBuffer(buffer, IndexIO.BYTE_ORDER); - return new CompressedBigDecimalColumnPartSupplier(scaleSupplier, magnitudeSupplier); + return new CompressedBigDecimalColumnPartSupplier( + buffer.position() - positionStart, + scaleSupplier, + magnitudeSupplier + ); } else { throw new IAE("Unknown version[%s]", versionFromBuffer); } } + private final int byteSize; + private final CompressedVSizeColumnarIntsSupplier scaleSupplier; + private final V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier; + + /** + * Constructor. + * + * @param scaleSupplier scale supplier + * @param magnitudeSupplier supplied of results + */ + public CompressedBigDecimalColumnPartSupplier( + int byteSize, + CompressedVSizeColumnarIntsSupplier scaleSupplier, + V3CompressedVSizeColumnarMultiIntsSupplier magnitudeSupplier + ) + { + this.byteSize = byteSize; + this.scaleSupplier = scaleSupplier; + this.magnitudeSupplier = magnitudeSupplier; + } + @Override public ComplexColumn get() { - return new CompressedBigDecimalColumn(scaleSupplier.get(), magnitudeSupplier.get()); + return new CompressedBigDecimalColumn(byteSize, scaleSupplier.get(), magnitudeSupplier.get()); } } diff --git a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java index b779c6318021..d9a3f6e97141 100644 --- a/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java +++ b/extensions-contrib/compressed-bigdecimal/src/test/java/org/apache/druid/compressedbigdecimal/CompressedBigDecimalColumnTest.java @@ -35,6 +35,7 @@ public void testCompressedBigDecimalColumn() ColumnarInts columnarInts = EasyMock.createMock(ColumnarInts.class); ReadableOffset readableOffset = EasyMock.createMock(ReadableOffset.class); CompressedBigDecimalColumn compressedBigDecimalColumn = new CompressedBigDecimalColumn( + 12345, columnarInts, columnarMultiInts ); @@ -42,7 +43,7 @@ public void testCompressedBigDecimalColumn() CompressedBigDecimalModule.COMPRESSED_BIG_DECIMAL, compressedBigDecimalColumn.getTypeName() ); - Assert.assertEquals(0, compressedBigDecimalColumn.getLength()); + Assert.assertEquals(12345, compressedBigDecimalColumn.getLength()); Assert.assertEquals(CompressedBigDecimalColumn.class, compressedBigDecimalColumn.getClazz()); Assert.assertNotNull(compressedBigDecimalColumn.makeColumnValueSelector(readableOffset)); } diff --git a/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java b/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java index 1f6396badd29..f7b662d42bb2 100644 --- a/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java +++ b/processing/src/main/java/org/apache/druid/frame/read/columnar/ComplexFrameColumnReader.java @@ -175,7 +175,7 @@ public Object getRowValue(int rowNum) @Override public int getLength() { - return frame.numRows(); + return (int) frame.numBytes(); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java index deb1623701c0..2cc60843f9e7 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerde.java @@ -21,6 +21,7 @@ import org.apache.druid.collections.SerializablePair; import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; import org.apache.druid.segment.data.GenericIndexed; @@ -28,6 +29,7 @@ import org.apache.druid.segment.serde.ComplexColumnPartSupplier; import org.apache.druid.segment.serde.ComplexMetricExtractor; import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; @@ -49,6 +51,20 @@ */ public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricSerde { + /** + * This is a configuration parameter to allow for turning on compression. It is a hack, it would be significantly + * better if this could be delivered via properties. The number one reason this is a hack is because it reads + * the System.getProperty which doesn't actually have runtime.properties files put into it, so this setting + * could be set in runtime.properties and this code wouldn't see it, because that's not how it is wired up. + * + * The intent of this parameter is so that Druid 25 can be released using the legacy serialization format. This + * will allow us to get code released that can *read* both the legacy and the new format. Then, in Druid 26, + * we can completely eliminate this boolean and start to only *write* the new format, in which case this + * hack of a configuration property disappears. + */ + private static final boolean COMPRESSION_ENABLED = + Boolean.parseBoolean(System.getProperty("druid.columns.pairLongString.compressed", "false")); + public static final int EXPECTED_VERSION = 3; public static final String TYPE_NAME = "serializablePairLongString"; // Null SerializablePairLongString values are put first @@ -60,6 +76,17 @@ public class SerializablePairLongStringComplexMetricSerde extends ComplexMetricS private static final SerializablePairLongStringSimpleStagedSerde SERDE = new SerializablePairLongStringSimpleStagedSerde(); + private final boolean compressionEnabled; + + public SerializablePairLongStringComplexMetricSerde() + { + this(COMPRESSION_ENABLED); + } + + public SerializablePairLongStringComplexMetricSerde(boolean compressionEnabled) + { + this.compressionEnabled = compressionEnabled; + } @Override public String getTypeName() @@ -92,7 +119,7 @@ public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder) byte version = buffer.get(buffer.position()); if (version == 0 || version == 1 || version == 2) { - GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper()); + GenericIndexed column = GenericIndexed.read(buffer, LEGACY_STRATEGY, columnBuilder.getFileMapper()); columnBuilder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column)); } else { SerializablePairLongStringComplexColumn.Builder builder = @@ -141,9 +168,73 @@ public byte[] toBytes(SerializablePairLongString val) @Override public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) { - return new SerializablePairLongStringColumnSerializer( - segmentWriteOutMedium, - NativeClearedByteBufferProvider.INSTANCE - ); + if (compressionEnabled) { + return new SerializablePairLongStringColumnSerializer( + segmentWriteOutMedium, + NativeClearedByteBufferProvider.INSTANCE + ); + } else { + return LargeColumnSupportedComplexColumnSerializer.create( + segmentWriteOutMedium, + column, + LEGACY_STRATEGY + ); + } } + + private static final ObjectStrategy LEGACY_STRATEGY = + new ObjectStrategy() + { + @Override + public int compare(@Nullable SerializablePairLongString o1, @Nullable SerializablePairLongString o2) + { + return COMPARATOR.compare(o1, o2); + } + + @Override + public Class getClazz() + { + return SerializablePairLongString.class; + } + + @Override + public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int numBytes) + { + final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); + + long lhs = readOnlyBuffer.getLong(); + int stringSize = readOnlyBuffer.getInt(); + + String lastString = null; + if (stringSize > 0) { + byte[] stringBytes = new byte[stringSize]; + readOnlyBuffer.get(stringBytes, 0, stringSize); + lastString = StringUtils.fromUtf8(stringBytes); + } + + return new SerializablePairLongString(lhs, lastString); + } + + @Override + public byte[] toBytes(SerializablePairLongString val) + { + String rhsString = val.rhs; + ByteBuffer bbuf; + + if (rhsString != null) { + byte[] rhsBytes = StringUtils.toUtf8(rhsString); + bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length); + bbuf.putLong(val.lhs); + bbuf.putInt(Long.BYTES, rhsBytes.length); + bbuf.position(Long.BYTES + Integer.BYTES); + bbuf.put(rhsBytes); + } else { + bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES); + bbuf.putLong(val.lhs); + bbuf.putInt(Long.BYTES, 0); + } + + return bbuf.array(); + } + }; } diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java index 88cc5dcb8fe5..041d8566d35a 100644 --- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java +++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentAnalyzer.java @@ -96,7 +96,7 @@ public Map analyze(Segment segment) final StorageAdapter storageAdapter = segment.asStorageAdapter(); // get length and column names from storageAdapter - final int length = storageAdapter.getNumRows(); + final int numRows = storageAdapter.getNumRows(); // Use LinkedHashMap to preserve column order. final Map columns = new LinkedHashMap<>(); @@ -119,13 +119,13 @@ public Map analyze(Segment segment) final int bytesPerRow = ColumnHolder.TIME_COLUMN_NAME.equals(columnName) ? NUM_BYTES_IN_TIMESTAMP : Long.BYTES; - analysis = analyzeNumericColumn(capabilities, length, bytesPerRow); + analysis = analyzeNumericColumn(capabilities, numRows, bytesPerRow); break; case FLOAT: - analysis = analyzeNumericColumn(capabilities, length, NUM_BYTES_IN_TEXT_FLOAT); + analysis = analyzeNumericColumn(capabilities, numRows, NUM_BYTES_IN_TEXT_FLOAT); break; case DOUBLE: - analysis = analyzeNumericColumn(capabilities, length, Double.BYTES); + analysis = analyzeNumericColumn(capabilities, numRows, Double.BYTES); break; case STRING: if (index != null) { @@ -136,7 +136,7 @@ public Map analyze(Segment segment) break; case COMPLEX: final ColumnHolder columnHolder = index != null ? index.getColumnHolder(columnName) : null; - analysis = analyzeComplexColumn(capabilities, columnHolder); + analysis = analyzeComplexColumn(capabilities, numRows, columnHolder); break; default: log.warn("Unknown column type[%s].", capabilities.asTypeString()); @@ -330,6 +330,7 @@ public Long accumulate(Long accumulated, Cursor cursor) private ColumnAnalysis analyzeComplexColumn( @Nullable final ColumnCapabilities capabilities, + final int numCells, @Nullable final ColumnHolder columnHolder ) { @@ -362,8 +363,7 @@ private ColumnAnalysis analyzeComplexColumn( ); } - final int length = complexColumn.getLength(); - for (int i = 0; i < length; ++i) { + for (int i = 0; i < numCells; ++i) { size += inputSizeFn.apply(complexColumn.getRowValue(i)); } } diff --git a/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java index 0122f1e53577..b0f3a4388c83 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/ComplexColumn.java @@ -58,7 +58,7 @@ public interface ComplexColumn extends BaseColumn Object getRowValue(int rowNum); /** - * @return serialized size (in bytes) of this column. + * @return serialized size (in bytes) of this column. -1 for unknown */ int getLength(); diff --git a/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java index f3ec1341173a..4b1654d88843 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/GenericIndexedBasedComplexColumn.java @@ -58,7 +58,7 @@ public Object getRowValue(int rowNum) @Override public int getLength() { - return index.size(); + return -1; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java index ce4db70438ef..3f11b779eaf7 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/column/UnknownTypeComplexColumn.java @@ -59,7 +59,7 @@ public Object getRowValue(int rowNum) @Override public int getLength() { - return 0; + return -1; } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java index e062d1e94ce2..c4682d20711b 100644 --- a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java +++ b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java @@ -281,7 +281,7 @@ public int getMaxVectorSize() @Override public int getLength() { - return 0; + return -1; } @Override diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java index c8605f4695b8..f503874a4d6b 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/SerializablePairLongStringComplexMetricSerdeTest.java @@ -20,55 +20,50 @@ package org.apache.druid.query.aggregation; import com.google.common.collect.ImmutableList; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.GenericColumnSerializer; import org.apache.druid.segment.column.ColumnBuilder; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ComplexColumn; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.serde.cell.RandomStringUtils; import org.apache.druid.segment.writeout.HeapByteBufferWriteOutBytes; import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium; import org.apache.druid.segment.writeout.SegmentWriteOutMedium; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Random; -import java.util.stream.Collectors; +import java.util.concurrent.atomic.AtomicReference; public class SerializablePairLongStringComplexMetricSerdeTest { - private static final SerializablePairLongStringComplexMetricSerde COMPLEX_METRIC_SERDE = + static { + NullHandling.initializeForTests(); + } + + private static final SerializablePairLongStringComplexMetricSerde LEGACY_SERDE = new SerializablePairLongStringComplexMetricSerde(); + private static final SerializablePairLongStringComplexMetricSerde COMPRESSED_SERDE = + new SerializablePairLongStringComplexMetricSerde(true); // want deterministic test input private final Random random = new Random(0); private final RandomStringUtils randomStringUtils = new RandomStringUtils(random); - private GenericColumnSerializer serializer; - - @SuppressWarnings("unchecked") - @Before - public void setup() - { - SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium(); - serializer = (GenericColumnSerializer) COMPLEX_METRIC_SERDE.getSerializer( - writeOutMedium, - "not-used" - ); - } - @Test public void testSingle() throws Exception { - assertExpected(ImmutableList.of(new SerializablePairLongString(100L, "fuu")), 77); + assertExpected(ImmutableList.of(new SerializablePairLongString(100L, "fuu")), 33, 77); } @Test @@ -78,7 +73,7 @@ public void testLargeString() throws Exception assertExpected(ImmutableList.of(new SerializablePairLongString( 100L, randomStringUtils.randomAlphanumeric(2 * 1024 * 1024) - )), 2103140); + )), 2097182, 2103140); } @Test @@ -95,8 +90,7 @@ public void testCompressable() throws Exception valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, stringList.get(i % numStrings))); } - //actual input bytes in naive encoding is ~10mb - assertExpected(valueList, 1746026); + assertExpected(valueList, 10440010, 1746026); } @Test @@ -109,8 +103,7 @@ public void testHighlyCompressable() throws Exception valueList.add(new SerializablePairLongString(Integer.MAX_VALUE + (long) i, stringValue)); } - //actual input bytes in naive encoding is ~10mb - assertExpected(valueList, 289645); + assertExpected(valueList, 10440010, 289645); } @Test @@ -122,81 +115,109 @@ public void testRandom() throws Exception valueList.add(new SerializablePairLongString(random.nextLong(), randomStringUtils.randomAlphanumeric(1024))); } - assertExpected(valueList, 10428975); + assertExpected(valueList, 10440010, 10428975); } @Test public void testNullString() throws Exception { - assertExpected(ImmutableList.of(new SerializablePairLongString(100L, null)), 74); + assertExpected(ImmutableList.of(new SerializablePairLongString(100L, null)), 30, 74); } @Test public void testEmpty() throws Exception { // minimum size for empty data - assertExpected(Collections.emptyList(), 57); + assertExpected(Collections.emptyList(), 10, 57); } @Test public void testSingleNull() throws Exception { - assertExpected(Arrays.asList(new SerializablePairLongString[]{null}), 58); + assertExpected(Arrays.asList(new SerializablePairLongString[]{null}), 18, 58); } @Test public void testMultipleNull() throws Exception { - assertExpected(Arrays.asList(null, null, null, null), 59); - } - - private void assertExpected(List expected) throws IOException - { - assertExpected(expected, -1); + assertExpected(Arrays.asList(null, null, null, null), 42, 59); } - private void assertExpected(List expected, int expectedSize) throws IOException + private ByteBuffer assertExpected( + List expected, + int expectedLegacySize, + int expectedCompressedSize + ) throws IOException { - List valueSelectors = - expected.stream().map(SerializablePairLongStringValueSelector::new).collect(Collectors.toList()); - ByteBuffer byteBuffer = serializeAllValuesToByteBuffer(valueSelectors, serializer, expectedSize); - - try (SerializablePairLongStringComplexColumn complexColumn = createComplexColumn(byteBuffer)) { - for (int i = 0; i < valueSelectors.size(); i++) { - Assert.assertEquals(expected.get(i), complexColumn.getRowValue(i)); + SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium(); + ByteBuffer legacyBuffer = serializeAllValuesToByteBuffer( + expected, + LEGACY_SERDE.getSerializer(writeOutMedium, "not-used"), + expectedLegacySize + ).asReadOnlyBuffer(); + ByteBuffer compressedBuffer = serializeAllValuesToByteBuffer( + expected, + COMPRESSED_SERDE.getSerializer(writeOutMedium, "not-used"), + expectedCompressedSize + ).asReadOnlyBuffer(); + + try (ComplexColumn legacyCol = createComplexColumn(legacyBuffer); + ComplexColumn compressedCol = createComplexColumn(compressedBuffer) + ) { + for (int i = 0; i < expected.size(); i++) { + Assert.assertEquals(expected.get(i), legacyCol.getRowValue(i)); + Assert.assertEquals(expected.get(i), compressedCol.getRowValue(i)); } } + return compressedBuffer; } - private SerializablePairLongStringComplexColumn createComplexColumn(ByteBuffer byteBuffer) + private ComplexColumn createComplexColumn(ByteBuffer byteBuffer) { ColumnBuilder builder = new ColumnBuilder(); int serializedSize = byteBuffer.remaining(); - COMPLEX_METRIC_SERDE.deserializeColumn(byteBuffer, builder); + LEGACY_SERDE.deserializeColumn(byteBuffer, builder); builder.setType(ValueType.COMPLEX); ColumnHolder columnHolder = builder.build(); - SerializablePairLongStringComplexColumn column = (SerializablePairLongStringComplexColumn) columnHolder.getColumn(); - - Assert.assertEquals(serializedSize, column.getLength()); - Assert.assertEquals("serializablePairLongString", column.getTypeName()); - Assert.assertEquals(SerializablePairLongString.class, column.getClazz()); + final ComplexColumn col = (ComplexColumn) columnHolder.getColumn(); + if (col instanceof SerializablePairLongStringComplexColumn) { + Assert.assertEquals(serializedSize, col.getLength()); + } + Assert.assertEquals("serializablePairLongString", col.getTypeName()); + Assert.assertEquals(SerializablePairLongString.class, col.getClazz()); - return column; + return col; } + @SuppressWarnings({"unchecked", "rawtypes"}) private static ByteBuffer serializeAllValuesToByteBuffer( - Collection valueSelectors, - GenericColumnSerializer serializer, + List values, + GenericColumnSerializer serializer, int expectedSize ) throws IOException { serializer.open(); - for (SerializablePairLongStringValueSelector valueSelector : valueSelectors) { + final AtomicReference reference = new AtomicReference<>(null); + ColumnValueSelector valueSelector = + new SingleObjectColumnValueSelector( + SerializablePairLongString.class + ) + { + @Nullable + @Override + public SerializablePairLongString getObject() + { + return reference.get(); + } + }; + + for (SerializablePairLongString selector : values) { + reference.set(selector); serializer.serialize(valueSelector); } @@ -225,13 +246,4 @@ private static ByteBuffer serializeToByteBuffer( return byteBuffer; } - - private static class SerializablePairLongStringValueSelector - extends SingleValueColumnValueSelector - { - public SerializablePairLongStringValueSelector(SerializablePairLongString value) - { - super(SerializablePairLongString.class, value); - } - } } diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java b/processing/src/test/java/org/apache/druid/query/aggregation/SingleObjectColumnValueSelector.java similarity index 83% rename from processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java rename to processing/src/test/java/org/apache/druid/query/aggregation/SingleObjectColumnValueSelector.java index 8c9d232ca9d7..5869a6b871c1 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/SingleValueColumnValueSelector.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/SingleObjectColumnValueSelector.java @@ -22,17 +22,13 @@ import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; -import javax.annotation.Nullable; - -public class SingleValueColumnValueSelector implements ColumnValueSelector +public abstract class SingleObjectColumnValueSelector implements ColumnValueSelector { private final Class valueClass; - private final T value; - public SingleValueColumnValueSelector(Class valueClass, T value) + public SingleObjectColumnValueSelector(Class valueClass) { this.valueClass = valueClass; - this.value = value; } @Override @@ -64,13 +60,6 @@ public boolean isNull() return false; } - @Nullable - @Override - public T getObject() - { - return value; - } - @Override public Class classOfObject() {