From 17a8b9bb4f9765d8ec4a99567350151b69bc62a5 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 26 Jul 2018 01:40:15 -0700 Subject: [PATCH 1/3] Fix 'auto' encoded longs + compression serializer Fixes #6044 changes: * Fixes `VSizeLongSerde` serializers to treat 'close' as 'flush' when used with `BlockLayoutColumnarLongsSerializer`, allowing unwritten values to be flushed to the buffer when the block is compressed * Add exhaustive unit test that flexes a variety of value sizes, row counts, and compression strategies to catch issues such as these : --- .../io/druid/segment/data/VSizeLongSerde.java | 71 +++++---- .../CompressedLongsAutoEncodingSerdeTest.java | 138 ++++++++++++++++++ 2 files changed, 181 insertions(+), 28 deletions(-) create mode 100644 processing/src/test/java/io/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java diff --git a/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java b/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java index 4f7abe1d0308..c793924811c2 100644 --- a/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java +++ b/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java @@ -210,6 +210,8 @@ public Size1Ser(ByteBuffer buffer, int offset) @Override public void write(long value) throws IOException { + curByte = (byte) ((curByte << 1) | (value & 1)); + count++; if (count == 8) { buffer.put(curByte); count = 0; @@ -218,25 +220,28 @@ public void write(long value) throws IOException buffer.rewind(); } } - curByte = (byte) ((curByte << 1) | (value & 1)); - count++; } @Override public void close() throws IOException { - if (closed) { - return; - } - buffer.put((byte) (curByte << (8 - count))); if (output != null) { - output.write(buffer.array()); + if (closed) { + return; + } + if (count > 0) { + buffer.put((byte) (curByte << (8 - count))); + output.write(buffer.array()); + } output.write(EMPTY); output.flush(); + closed = true; } else { + if (count > 0) { + buffer.put((byte) (curByte << (8 - count))); + } buffer.putInt(0); } - closed = true; } } @@ -263,6 +268,9 @@ public Size2Ser(ByteBuffer buffer, int offset) @Override public void write(long value) throws IOException { + curByte = (byte) ((curByte << 2) | (value & 3)); + count += 2; + if (count == 8) { buffer.put(curByte); count = 0; @@ -271,25 +279,28 @@ public void write(long value) throws IOException buffer.rewind(); } } - curByte = (byte) ((curByte << 2) | (value & 3)); - count += 2; } @Override public void close() throws IOException { - if (closed) { - return; - } - buffer.put((byte) (curByte << (8 - count))); if (output != null) { - output.write(buffer.array()); + if (closed) { + return; + } + if (count > 0) { + buffer.put((byte) (curByte << (8 - count))); + output.write(buffer.array()); + } output.write(EMPTY); output.flush(); + closed = true; } else { + if (count > 0) { + buffer.put((byte) (curByte << (8 - count))); + } buffer.putInt(0); } - closed = true; } } @@ -342,20 +353,24 @@ public void write(long value) throws IOException @Override public void close() throws IOException { - if (closed) { - return; - } - if (!first) { - buffer.put((byte) (curByte << 4)); - } + if (output != null) { - output.write(buffer.array(), 0, buffer.position()); + if (closed) { + return; + } + if (!first) { + buffer.put((byte) (curByte << 4)); + output.write(buffer.array(), 0, buffer.position()); + } output.write(EMPTY); output.flush(); + closed = true; } else { + if (!first) { + buffer.put((byte) (curByte << 4)); + } buffer.putInt(0); } - closed = true; } } @@ -395,16 +410,16 @@ public void write(long value) throws IOException @Override public void close() throws IOException { - if (closed) { - return; - } if (output != null) { + if (closed) { + return; + } output.write(EMPTY); output.flush(); + closed = true; } else { buffer.putInt(0); } - closed = true; } } diff --git a/processing/src/test/java/io/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java b/processing/src/test/java/io/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java new file mode 100644 index 000000000000..a969b34bc098 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/data/CompressedLongsAutoEncodingSerdeTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import io.druid.java.util.common.StringUtils; +import io.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +@RunWith(Parameterized.class) +public class CompressedLongsAutoEncodingSerdeTest +{ + @Parameterized.Parameters(name = "{0} {1} {2}") + public static Iterable compressionStrategies() + { + List data = new ArrayList<>(); + for (long bpv : bitsPerValueParameters) { + for (CompressionStrategy strategy : CompressionStrategy.values()) { + data.add(new Object[]{bpv, strategy, ByteOrder.BIG_ENDIAN}); + data.add(new Object[]{bpv, strategy, ByteOrder.LITTLE_ENDIAN}); + } + } + return data; + } + + private static final long[] bitsPerValueParameters = new long[]{1, 2, 4, 7, 11, 14, 18, 23, 31, 39, 46, 55, 62}; + + protected final CompressionFactory.LongEncodingStrategy encodingStrategy = CompressionFactory.LongEncodingStrategy.AUTO; + protected final CompressionStrategy compressionStrategy; + protected final ByteOrder order; + protected final long bitsPerValue; + + public CompressedLongsAutoEncodingSerdeTest( + long bitsPerValue, + CompressionStrategy compressionStrategy, + ByteOrder order + ) + { + this.bitsPerValue = bitsPerValue; + this.compressionStrategy = compressionStrategy; + this.order = order; + } + + @Test + public void testFidelity() throws Exception + { + final long bound = 1L << bitsPerValue; + // big enough to have at least 2 blocks, and a handful of sizes offset by 1 from each other + int blockSize = 1 << 16; + int numBits = (Long.SIZE - Long.numberOfLeadingZeros(1 << (bitsPerValue - 1))); + double numValuesPerByte = 8.0 / (double) numBits; + + int numRows = (int) (blockSize * numValuesPerByte) * 2 + ThreadLocalRandom.current().nextInt(1, 101); + long chunk[] = new long[numRows]; + for (int i = 0; i < numRows; i++) { + chunk[i] = ThreadLocalRandom.current().nextLong(bound); + } + testValues(chunk); + + numRows++; + chunk = new long[numRows]; + for (int i = 0; i < numRows; i++) { + chunk[i] = ThreadLocalRandom.current().nextLong(bound); + } + testValues(chunk); + } + + public void testValues(long[] values) throws Exception + { + ColumnarLongsSerializer serializer = CompressionFactory.getLongSerializer( + new OffHeapMemorySegmentWriteOutMedium(), + "test", + order, + encodingStrategy, + compressionStrategy + ); + serializer.open(); + + for (long value : values) { + serializer.add(value); + } + Assert.assertEquals(values.length, serializer.size()); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializer.writeTo(Channels.newChannel(baos), null); + Assert.assertEquals(baos.size(), serializer.getSerializedSize()); + CompressedColumnarLongsSupplier supplier = + CompressedColumnarLongsSupplier.fromByteBuffer(ByteBuffer.wrap(baos.toByteArray()), order); + ColumnarLongs longs = supplier.get(); + + assertIndexMatchesVals(longs, values); + longs.close(); + } + + private void assertIndexMatchesVals(ColumnarLongs indexed, long[] vals) + { + Assert.assertEquals(vals.length, indexed.size()); + for (int i = 0; i < indexed.size(); ++i) { + Assert.assertEquals( + StringUtils.format( + "Value [%d] at row '%d' does not match [%d]", + indexed.get(i), + i, + vals[i] + ), + vals[i], + indexed.get(i) + ); + } + } +} From 00dcd3e46fb38d55c9df8835a28b6c9579ee4ab5 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 27 Jul 2018 14:52:08 -0700 Subject: [PATCH 2/3] refactor LongSerializer close to be named flush instead --- .../druid/segment/data/DeltaLongEncodingWriter.java | 2 +- .../druid/segment/data/TableLongEncodingWriter.java | 2 +- .../java/io/druid/segment/data/VSizeLongSerde.java | 12 ++++++------ .../io/druid/segment/data/VSizeLongSerdeTest.java | 8 ++++---- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/data/DeltaLongEncodingWriter.java b/processing/src/main/java/io/druid/segment/data/DeltaLongEncodingWriter.java index 7fe714adb545..1735b698c146 100644 --- a/processing/src/main/java/io/druid/segment/data/DeltaLongEncodingWriter.java +++ b/processing/src/main/java/io/druid/segment/data/DeltaLongEncodingWriter.java @@ -87,7 +87,7 @@ public int getNumBytes(int values) public void flush() throws IOException { if (serializer != null) { - serializer.close(); + serializer.flush(); } } } diff --git a/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java b/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java index 633492801ab4..ed3420864237 100644 --- a/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java +++ b/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java @@ -67,7 +67,7 @@ public void write(long value) throws IOException public void flush() throws IOException { if (serializer != null) { - serializer.close(); + serializer.flush(); } } diff --git a/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java b/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java index c793924811c2..933dfd96fabf 100644 --- a/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java +++ b/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java @@ -21,7 +21,6 @@ import io.druid.java.util.common.IAE; -import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -182,9 +181,10 @@ public static LongDeserializer getDeserializer(int longSize, ByteBuffer fromBuff } } - public interface LongSerializer extends Closeable + public interface LongSerializer { void write(long value) throws IOException; + void flush() throws IOException; } private static final class Size1Ser implements LongSerializer @@ -223,7 +223,7 @@ public void write(long value) throws IOException } @Override - public void close() throws IOException + public void flush() throws IOException { if (output != null) { if (closed) { @@ -282,7 +282,7 @@ public void write(long value) throws IOException } @Override - public void close() throws IOException + public void flush() throws IOException { if (output != null) { if (closed) { @@ -351,7 +351,7 @@ public void write(long value) throws IOException } @Override - public void close() throws IOException + public void flush() throws IOException { if (output != null) { @@ -408,7 +408,7 @@ public void write(long value) throws IOException } @Override - public void close() throws IOException + public void flush() throws IOException { if (output != null) { if (closed) { diff --git a/processing/src/test/java/io/druid/segment/data/VSizeLongSerdeTest.java b/processing/src/test/java/io/druid/segment/data/VSizeLongSerdeTest.java index 162e1493b85e..bff862cc2601 100644 --- a/processing/src/test/java/io/druid/segment/data/VSizeLongSerdeTest.java +++ b/processing/src/test/java/io/druid/segment/data/VSizeLongSerdeTest.java @@ -109,8 +109,8 @@ public void testSerde(int longSize, long[] values) throws IOException streamSer.write(value); bufferSer.write(value); } - streamSer.close(); - bufferSer.close(); + streamSer.flush(); + bufferSer.flush(); buffer = ByteBuffer.wrap(outStream.toByteArray()); Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, values.length), buffer.capacity()); @@ -133,8 +133,8 @@ public void testSerdeIncLoop(int longSize, long start, long end) throws IOExcept streamSer.write(i); bufferSer.write(i); } - streamSer.close(); - bufferSer.close(); + streamSer.flush(); + bufferSer.flush(); buffer = ByteBuffer.wrap(outStream.toByteArray()); Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, (int) (end - start)), buffer.capacity()); From 6e43488e50c393f680ad60ea39ce2c5b72b2ffd5 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 27 Jul 2018 18:02:25 -0700 Subject: [PATCH 3/3] revert and just make new serializers per block --- .../BlockLayoutColumnarLongsSerializer.java | 1 + .../segment/data/DeltaLongEncodingWriter.java | 2 +- .../segment/data/TableLongEncodingWriter.java | 2 +- .../io/druid/segment/data/VSizeLongSerde.java | 83 ++++++++----------- .../segment/data/VSizeLongSerdeTest.java | 8 +- 5 files changed, 41 insertions(+), 55 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java b/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java index f9b5cffea401..1031bc25b68b 100644 --- a/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java +++ b/processing/src/main/java/io/druid/segment/data/BlockLayoutColumnarLongsSerializer.java @@ -92,6 +92,7 @@ public void add(long value) throws IOException endBuffer.flip(); flattener.write(endBuffer); endBuffer.clear(); + writer.setBuffer(endBuffer); } writer.write(value); diff --git a/processing/src/main/java/io/druid/segment/data/DeltaLongEncodingWriter.java b/processing/src/main/java/io/druid/segment/data/DeltaLongEncodingWriter.java index 1735b698c146..7fe714adb545 100644 --- a/processing/src/main/java/io/druid/segment/data/DeltaLongEncodingWriter.java +++ b/processing/src/main/java/io/druid/segment/data/DeltaLongEncodingWriter.java @@ -87,7 +87,7 @@ public int getNumBytes(int values) public void flush() throws IOException { if (serializer != null) { - serializer.flush(); + serializer.close(); } } } diff --git a/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java b/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java index ed3420864237..633492801ab4 100644 --- a/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java +++ b/processing/src/main/java/io/druid/segment/data/TableLongEncodingWriter.java @@ -67,7 +67,7 @@ public void write(long value) throws IOException public void flush() throws IOException { if (serializer != null) { - serializer.flush(); + serializer.close(); } } diff --git a/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java b/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java index 933dfd96fabf..4f7abe1d0308 100644 --- a/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java +++ b/processing/src/main/java/io/druid/segment/data/VSizeLongSerde.java @@ -21,6 +21,7 @@ import io.druid.java.util.common.IAE; +import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -181,10 +182,9 @@ public static LongDeserializer getDeserializer(int longSize, ByteBuffer fromBuff } } - public interface LongSerializer + public interface LongSerializer extends Closeable { void write(long value) throws IOException; - void flush() throws IOException; } private static final class Size1Ser implements LongSerializer @@ -210,8 +210,6 @@ public Size1Ser(ByteBuffer buffer, int offset) @Override public void write(long value) throws IOException { - curByte = (byte) ((curByte << 1) | (value & 1)); - count++; if (count == 8) { buffer.put(curByte); count = 0; @@ -220,28 +218,25 @@ public void write(long value) throws IOException buffer.rewind(); } } + curByte = (byte) ((curByte << 1) | (value & 1)); + count++; } @Override - public void flush() throws IOException + public void close() throws IOException { + if (closed) { + return; + } + buffer.put((byte) (curByte << (8 - count))); if (output != null) { - if (closed) { - return; - } - if (count > 0) { - buffer.put((byte) (curByte << (8 - count))); - output.write(buffer.array()); - } + output.write(buffer.array()); output.write(EMPTY); output.flush(); - closed = true; } else { - if (count > 0) { - buffer.put((byte) (curByte << (8 - count))); - } buffer.putInt(0); } + closed = true; } } @@ -268,9 +263,6 @@ public Size2Ser(ByteBuffer buffer, int offset) @Override public void write(long value) throws IOException { - curByte = (byte) ((curByte << 2) | (value & 3)); - count += 2; - if (count == 8) { buffer.put(curByte); count = 0; @@ -279,28 +271,25 @@ public void write(long value) throws IOException buffer.rewind(); } } + curByte = (byte) ((curByte << 2) | (value & 3)); + count += 2; } @Override - public void flush() throws IOException + public void close() throws IOException { + if (closed) { + return; + } + buffer.put((byte) (curByte << (8 - count))); if (output != null) { - if (closed) { - return; - } - if (count > 0) { - buffer.put((byte) (curByte << (8 - count))); - output.write(buffer.array()); - } + output.write(buffer.array()); output.write(EMPTY); output.flush(); - closed = true; } else { - if (count > 0) { - buffer.put((byte) (curByte << (8 - count))); - } buffer.putInt(0); } + closed = true; } } @@ -351,26 +340,22 @@ public void write(long value) throws IOException } @Override - public void flush() throws IOException + public void close() throws IOException { - + if (closed) { + return; + } + if (!first) { + buffer.put((byte) (curByte << 4)); + } if (output != null) { - if (closed) { - return; - } - if (!first) { - buffer.put((byte) (curByte << 4)); - output.write(buffer.array(), 0, buffer.position()); - } + output.write(buffer.array(), 0, buffer.position()); output.write(EMPTY); output.flush(); - closed = true; } else { - if (!first) { - buffer.put((byte) (curByte << 4)); - } buffer.putInt(0); } + closed = true; } } @@ -408,18 +393,18 @@ public void write(long value) throws IOException } @Override - public void flush() throws IOException + public void close() throws IOException { + if (closed) { + return; + } if (output != null) { - if (closed) { - return; - } output.write(EMPTY); output.flush(); - closed = true; } else { buffer.putInt(0); } + closed = true; } } diff --git a/processing/src/test/java/io/druid/segment/data/VSizeLongSerdeTest.java b/processing/src/test/java/io/druid/segment/data/VSizeLongSerdeTest.java index bff862cc2601..162e1493b85e 100644 --- a/processing/src/test/java/io/druid/segment/data/VSizeLongSerdeTest.java +++ b/processing/src/test/java/io/druid/segment/data/VSizeLongSerdeTest.java @@ -109,8 +109,8 @@ public void testSerde(int longSize, long[] values) throws IOException streamSer.write(value); bufferSer.write(value); } - streamSer.flush(); - bufferSer.flush(); + streamSer.close(); + bufferSer.close(); buffer = ByteBuffer.wrap(outStream.toByteArray()); Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, values.length), buffer.capacity()); @@ -133,8 +133,8 @@ public void testSerdeIncLoop(int longSize, long start, long end) throws IOExcept streamSer.write(i); bufferSer.write(i); } - streamSer.flush(); - bufferSer.flush(); + streamSer.close(); + bufferSer.close(); buffer = ByteBuffer.wrap(outStream.toByteArray()); Assert.assertEquals(VSizeLongSerde.getSerializedSize(longSize, (int) (end - start)), buffer.capacity());