From bb4a06b0467e1d3f69861b1a0467f8aec51ee5f2 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 7 May 2021 12:35:18 -0700 Subject: [PATCH 01/12] HBASE-25869 WAL value compression WAL storage can be expensive, especially if the cell values represented in the edits are large, consisting of blobs or significant lengths of text. Such WALs might need to be kept around for a fairly long time to satisfy replication constraints on a space limited (or space-contended) filesystem. We have a custom dictionary compression scheme for cell metadata that is engaged when WAL compression is enabled in site configuration. This is fine for that application, where we can expect the universe of values and their lengths in the custom dictionaries to be constrained. For arbitrary cell values it is better to use Deflate compression, which is a complete LZ-class algorithm suitable for arbitrary albeit compressible data, is reasonably fast, certainly fast enough for WALs, compresses well, and is universally available as part of the Java runtime. With a trick that encodes whether or not the cell value is compressed in the high order bit of the type byte, this can be done in a backwards compatible manner. --- .../org/apache/hadoop/hbase/KeyValue.java | 5 + .../src/main/protobuf/server/region/WAL.proto | 1 + .../wal/AbstractProtobufLogWriter.java | 12 +- .../regionserver/wal/CompressionContext.java | 30 +++- .../regionserver/wal/ProtobufLogReader.java | 7 + .../hbase/regionserver/wal/ReaderBase.java | 7 +- .../hbase/regionserver/wal/WALCellCodec.java | 92 ++++++++++-- .../wal/TestWALCellCodecWithCompression.java | 136 +++++++++++++++--- 8 files changed, 255 insertions(+), 35 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index bc520b62593d..431585153503 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -230,6 +230,11 @@ public static enum Type { DeleteColumn((byte)12), DeleteFamily((byte)14), + // Effective maximum is 127 (Byte.MAX_VALUE). We set the high order bit of the + // type byte in the WAL codecs to indicate, in a backwards compatible way, if the + // value is compressed there. + EffectiveMaximum((byte)Byte.MAX_VALUE), + // Maximum is used when searching; you look from maximum on down. Maximum((byte)255); diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto index fd622cfc5ba1..2add88bda1f0 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto @@ -32,6 +32,7 @@ message WALHeader { optional bool has_tag_compression = 3; optional string writer_cls_name = 4; optional string cell_codec_cls_name = 5; + optional bool has_value_compression = 6; } /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index a94e5c8f02e5..1e6792158c1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -146,7 +146,8 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro try { this.compressionContext = new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path), - conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true)); + conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true), + conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false)); } catch (Exception e) { throw new IOException("Failed to initiate CompressionContext", e); } @@ -167,8 +168,13 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita boolean doTagCompress = doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); - length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf, - WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)))); + boolean doValueCompress = doCompress + && conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); + length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, + buildWALHeader(conf, WALHeader.newBuilder() + .setHasCompression(doCompress) + .setHasTagCompression(doTagCompress) + .setHasValueCompression(doValueCompress)))); initAfterHeader(doCompress); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 16866e18a7cf..5ed7a636402a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -22,6 +22,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.EnumMap; import java.util.Map; +import java.util.zip.Deflater; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; @@ -37,6 +38,9 @@ public class CompressionContext { static final String ENABLE_WAL_TAGS_COMPRESSION = "hbase.regionserver.wal.tags.enablecompression"; + static final String ENABLE_WAL_VALUE_COMPRESSION = + "hbase.regionserver.wal.value.enablecompression"; + public enum DictionaryIndex { REGION, TABLE, FAMILY, QUALIFIER, ROW } @@ -45,10 +49,12 @@ public enum DictionaryIndex { new EnumMap<>(DictionaryIndex.class); // Context used for compressing tags TagCompressionContext tagCompressionContext = null; + Deflater valueCompressor = null; public CompressionContext(Class dictType, boolean recoveredEdits, - boolean hasTagCompression) throws SecurityException, NoSuchMethodException, - InstantiationException, IllegalAccessException, InvocationTargetException { + boolean hasTagCompression, boolean hasValueCompression) + throws SecurityException, NoSuchMethodException, InstantiationException, + IllegalAccessException, InvocationTargetException { Constructor dictConstructor = dictType.getConstructor(); for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) { @@ -70,12 +76,28 @@ public CompressionContext(Class dictType, boolean recovere if (hasTagCompression) { tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE); } + if (hasValueCompression) { + valueCompressor = new Deflater(); + // Optimize for encoding speed + valueCompressor.setLevel(Deflater.BEST_SPEED); + } + } + + public CompressionContext(Class dictType, boolean recoveredEdits, + boolean hasTagCompression) + throws SecurityException, NoSuchMethodException, InstantiationException, + IllegalAccessException, InvocationTargetException { + this(dictType, recoveredEdits, hasTagCompression, false); } public Dictionary getDictionary(Enum dictIndex) { return dictionaries.get(dictIndex); } + public Deflater getValueCompressor() { + return valueCompressor; + } + void clear() { for(Dictionary dictionary : dictionaries.values()){ dictionary.clear(); @@ -83,5 +105,9 @@ void clear() { if (tagCompressionContext != null) { tagCompressionContext.clear(); } + if (valueCompressor != null) { + valueCompressor.reset(); + } } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 9cd48c056b7a..ce27313a211b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -81,6 +81,7 @@ public class ProtobufLogReader extends ReaderBase { protected WALCellCodec.ByteStringUncompressor byteStringUncompressor; protected boolean hasCompression = false; protected boolean hasTagCompression = false; + protected boolean hasValueCompression = false; // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit // entry in the wal, the inputstream's position is equal to walEditsStopOffset. private long walEditsStopOffset; @@ -227,6 +228,7 @@ private String initInternal(FSDataInputStream stream, boolean isFirst) WALProtos.WALHeader header = builder.build(); this.hasCompression = header.hasHasCompression() && header.getHasCompression(); this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression(); + this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression(); } this.inputStream = stream; this.walEditsStopOffset = this.fileLength; @@ -327,6 +329,11 @@ protected boolean hasTagCompression() { return this.hasTagCompression; } + @Override + protected boolean hasValueCompression() { + return this.hasValueCompression; + } + @Override protected boolean readNext(Entry entry) throws IOException { // OriginalPosition might be < 0 on local fs; if so, it is useless to us. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index 9b6d69a8ef19..c56e408566fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -69,7 +69,7 @@ public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream try { if (compressionContext == null) { compressionContext = new CompressionContext(LRUDictionary.class, - CommonFSUtils.isRecoveredEdits(path), hasTagCompression()); + CommonFSUtils.isRecoveredEdits(path), hasTagCompression(), hasValueCompression()); } else { compressionContext.clear(); } @@ -151,6 +151,11 @@ public void seek(long pos) throws IOException { */ protected abstract boolean hasTagCompression(); + /** + * @return Whether value compression is enabled for this log. + */ + protected abstract boolean hasValueCompression(); + /** * Read next entry. * @param e The entry to read into. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 5aa943f1d84a..834b68253a99 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.zip.DataFormatException; +import java.util.zip.Deflater; +import java.util.zip.Inflater; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -43,7 +46,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.io.IOUtils; - +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; @@ -220,6 +223,8 @@ private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throw } } + static final int VALUE_COMPRESS_THRESHOLD = 100; + static class CompressedKvEncoder extends BaseEncoder { private final CompressionContext compression; public CompressedKvEncoder(OutputStream out, CompressionContext compression) { @@ -241,10 +246,19 @@ public void write(Cell cell) throws IOException { compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY)); PrivateCellUtil.compressQualifier(out, cell, compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); - // Write timestamp, type and value as uncompressed. + // Write timestamp, type and value. StreamUtils.writeLong(out, cell.getTimestamp()); - out.write(cell.getTypeByte()); - PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); + byte type = cell.getTypeByte(); + if (compression.getValueCompressor() != null && + cell.getValueLength() > VALUE_COMPRESS_THRESHOLD) { + // Set the high bit of type to indicate the value is compressed + out.write((byte)(type|0x80)); + writeCompressedValue(out, cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength(), compression.getValueCompressor()); + } else { + out.write(type); + PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); + } if (tagsLength > 0) { if (compression.tagCompressionContext != null) { // Write tags using Dictionary compression @@ -256,6 +270,28 @@ public void write(Cell cell) throws IOException { } } } + + public static void writeCompressedValue(OutputStream out, byte[] valueArray, int offset, + int vlength, Deflater deflater) throws IOException { + byte[] buffer = new byte[4096]; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + deflater.reset(); + deflater.setInput(valueArray, offset, vlength); + boolean finished = false; + do { + int bytesOut = deflater.deflate(buffer); + if (bytesOut > 0) { + baos.write(buffer, 0, bytesOut); + } else { + bytesOut = deflater.deflate(buffer, 0, buffer.length, Deflater.FULL_FLUSH); + baos.write(buffer, 0, bytesOut); + finished = true; + } + } while (!finished); + StreamUtils.writeRawVInt32(out, baos.size()); + out.write(baos.toByteArray()); + } + } static class CompressedKvDecoder extends BaseDecoder { @@ -269,7 +305,6 @@ public CompressedKvDecoder(InputStream in, CompressionContext compression) { protected Cell parseCell() throws IOException { int keylength = StreamUtils.readRawVarint32(in); int vlength = StreamUtils.readRawVarint32(in); - int tagsLength = StreamUtils.readRawVarint32(in); int length = 0; if(tagsLength == 0) { @@ -302,14 +337,28 @@ protected Cell parseCell() throws IOException { compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); pos += elemLen; - // timestamp, type and value - int tsTypeValLen = length - pos; + // timestamp + long ts = StreamUtils.readLong(in); + pos = Bytes.putLong(backingArray, pos, ts); + // type and value + int typeValLen = length - pos; if (tagsLength > 0) { - tsTypeValLen = tsTypeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; + typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; + } + // high bit of type byte is 1 if value is compressed + byte type = (byte)in.read(); + if ((type & 0x80) == 0x80) { + type = (byte)(type & 0x7f); + pos = Bytes.putByte(backingArray, pos, type); + int valLen = typeValLen - 1; + readCompressedValue(in, backingArray, pos, valLen); + pos += valLen; + } else { + pos = Bytes.putByte(backingArray, pos, type); + int valLen = typeValLen - 1; + IOUtils.readFully(in, backingArray, pos, valLen); + pos += valLen; } - IOUtils.readFully(in, backingArray, pos, tsTypeValLen); - pos += tsTypeValLen; - // tags if (tagsLength > 0) { pos = Bytes.putAsShort(backingArray, pos, tagsLength); @@ -349,6 +398,27 @@ private static void checkLength(int len, int max) throws IOException { throw new IOException("Invalid length for compresesed portion of keyvalue: " + len); } } + + public static void readCompressedValue(InputStream in, byte[] outArray, int outOffset, + int expectedLength) throws IOException { + int compressedLength = StreamUtils.readRawVarint32(in); + byte[] buffer = new byte[compressedLength]; + IOUtils.readFully(in, buffer, 0, compressedLength); + Inflater inflater = new Inflater(); + inflater.setInput(buffer); + int remaining = expectedLength; + do { + try { + int inflatedBytes = inflater.inflate(outArray, outOffset, remaining); + Preconditions.checkState(inflatedBytes > 0, "Inflater state error"); + outOffset += inflatedBytes; + remaining -= inflatedBytes; + } catch (DataFormatException e) { + throw new IOException(e); + } + } while (remaining > 0); + } + } public static class EnsureKvEncoder extends BaseEncoder { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java index dd9ee697867f..a96553b25560 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -52,34 +53,137 @@ public class TestWALCellCodecWithCompression { @Test public void testEncodeDecodeKVsWithTags() throws Exception { - doTest(false, false); + doTest(false, false, false); } @Test public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception { - doTest(true, false); + doTest(true, false, false); } @Test public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception { - doTest(true, true); + doTest(true, false, true); } - private void doTest(boolean compressTags, boolean offheapKV) throws Exception { + @Test + public void testValueCompressionEnabled() throws Exception { + doTest(false, true, false); + } + + @Test + public void testValueCompression() throws Exception { + final byte[] row_1 = Bytes.toBytes("row_1"); + final byte[] value_1 = new byte[512]; + Bytes.zero(value_1); + final byte[] row_2 = Bytes.toBytes("row_2"); + final byte[] value_2 = new byte[Bytes.SIZEOF_LONG]; + Bytes.random(value_2); + final byte[] row_3 = Bytes.toBytes("row_3"); + final byte[] value_3 = new byte[1024]; + Bytes.random(value_3); + + Configuration conf = new Configuration(false); + WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, + false, true, true)); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Encoder encoder = codec.getEncoder(bos); + encoder.write(createKV(row_1, value_1, 0)); + encoder.write(createKV(row_2, value_2, 0)); + encoder.write(createKV(row_3, value_3, 0)); + encoder.flush(); + + try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) { + Decoder decoder = codec.getDecoder(is); + decoder.advance(); + KeyValue kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_1, 0, row_1.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_1, 0, value_1.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + decoder.advance(); + kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_2, 0, row_2.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_2, 0, value_2.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + decoder.advance(); + kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_3, 0, row_3.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_3, 0, value_3.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + } + } + + @Test + public void testValueCompressionCompatibility() throws Exception { + final byte[] row_1 = Bytes.toBytes("row_1"); + final byte[] value_1 = new byte[512]; + Bytes.zero(value_1); + final byte[] row_2 = Bytes.toBytes("row_2"); + final byte[] value_2 = new byte[Bytes.SIZEOF_LONG]; + Bytes.random(value_2); + final byte[] row_3 = Bytes.toBytes("row_3"); + final byte[] value_3 = new byte[1024]; + Bytes.random(value_3); + + Configuration conf = new Configuration(false); + WALCellCodec outCodec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, + false, false, false)); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + Encoder encoder = outCodec.getEncoder(bos); + encoder.write(createKV(row_1, value_1, 0)); + encoder.write(createKV(row_2, value_2, 0)); + encoder.write(createKV(row_3, value_3, 0)); + encoder.flush(); + + byte[] encodedCells = bos.toByteArray(); + + WALCellCodec inCodec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, + false, false, true)); + try (InputStream is = new ByteArrayInputStream(encodedCells)) { + Decoder decoder = inCodec.getDecoder(is); + decoder.advance(); + KeyValue kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_1, 0, row_1.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_1, 0, value_1.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + decoder.advance(); + kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_2, 0, row_2.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_2, 0, value_2.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + decoder.advance(); + kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_3, 0, row_3.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_3, 0, value_3.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + } + } + + private void doTest(boolean compressTags, boolean compressValue, boolean offheapKV) + throws Exception { + final byte[] key = Bytes.toBytes("myRow"); + final byte[] value = Bytes.toBytes("myValue"); Configuration conf = new Configuration(false); conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags); - WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false, - compressTags)); + conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, compressValue); + WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, + false, compressTags, compressValue)); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); Encoder encoder = codec.getEncoder(bos); if (offheapKV) { - encoder.write(createOffheapKV(1)); - encoder.write(createOffheapKV(0)); - encoder.write(createOffheapKV(2)); + encoder.write(createOffheapKV(key, value, 1)); + encoder.write(createOffheapKV(key, value, 0)); + encoder.write(createOffheapKV(key, value, 2)); } else { - encoder.write(createKV(1)); - encoder.write(createKV(0)); - encoder.write(createKV(2)); + encoder.write(createKV(key, value, 1)); + encoder.write(createKV(key, value, 0)); + encoder.write(createKV(key, value, 2)); } InputStream is = new ByteArrayInputStream(bos.toByteArray()); @@ -101,11 +205,9 @@ private void doTest(boolean compressTags, boolean offheapKV) throws Exception { assertEquals("tagValue2", Bytes.toString(Tag.cloneValue(tags.get(1)))); } - private KeyValue createKV(int noOfTags) { - byte[] row = Bytes.toBytes("myRow"); + private KeyValue createKV(byte[] row, byte[] value, int noOfTags) { byte[] cf = Bytes.toBytes("myCF"); byte[] q = Bytes.toBytes("myQualifier"); - byte[] value = Bytes.toBytes("myValue"); List tags = new ArrayList<>(noOfTags); for (int i = 1; i <= noOfTags; i++) { tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i))); @@ -113,11 +215,9 @@ private KeyValue createKV(int noOfTags) { return new KeyValue(row, cf, q, HConstants.LATEST_TIMESTAMP, value, tags); } - private ByteBufferKeyValue createOffheapKV(int noOfTags) { - byte[] row = Bytes.toBytes("myRow"); + private ByteBufferKeyValue createOffheapKV(byte[] row, byte[] value, int noOfTags) { byte[] cf = Bytes.toBytes("myCF"); byte[] q = Bytes.toBytes("myQualifier"); - byte[] value = Bytes.toBytes("myValue"); List tags = new ArrayList<>(noOfTags); for (int i = 1; i <= noOfTags; i++) { tags.add(new ArrayBackedTag((byte) i, Bytes.toBytes("tagValue" + i))); From 34698a6a8fb076242fea9a08c8e503ef0c5dd1ec Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Sat, 8 May 2021 12:41:02 -0700 Subject: [PATCH 02/12] - We were fully flushing the deflater at the end of every value (FULL_FLUSH). This was very conservative, allowing each value to be decompressed individually (so is resilient to corruptions), but seriously impacts compression ratio. Meanwhile our custom dictionary scheme accumulates strings over the whole file, so we were being conservative in value compression in a way our custom scheme defeats anyway. We can instead also let the Deflater build its dictionary over all values in the WAL, using SYNC_FLUSH instead. - Only store compressed values when we have achieved space savings. - Add more unit tests. --- .../regionserver/wal/CompressionContext.java | 39 +++++++-- .../hbase/regionserver/wal/WALCellCodec.java | 35 ++++---- .../wal/TestWALCellCodecWithCompression.java | 85 +++++++++++-------- .../wal/TestWALReplayValueCompression.java | 46 ++++++++++ .../wal/TestWALSplitValueCompression.java | 44 ++++++++++ 5 files changed, 189 insertions(+), 60 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayValueCompression.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 5ed7a636402a..d2a46eccc0c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -23,6 +23,7 @@ import java.util.EnumMap; import java.util.Map; import java.util.zip.Deflater; +import java.util.zip.Inflater; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; @@ -35,21 +36,45 @@ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) public class CompressionContext { - static final String ENABLE_WAL_TAGS_COMPRESSION = + public static final String ENABLE_WAL_TAGS_COMPRESSION = "hbase.regionserver.wal.tags.enablecompression"; - static final String ENABLE_WAL_VALUE_COMPRESSION = + public static final String ENABLE_WAL_VALUE_COMPRESSION = "hbase.regionserver.wal.value.enablecompression"; public enum DictionaryIndex { REGION, TABLE, FAMILY, QUALIFIER, ROW } + static class ValueCompressor { + final Deflater deflater; + final Inflater inflater; + + public ValueCompressor() { + deflater = new Deflater(); + deflater.setLevel(Deflater.BEST_SPEED); + inflater = new Inflater(); + } + + public Deflater getDeflater() { + return deflater; + } + + public Inflater getInflater() { + return inflater; + } + + public void clear() { + deflater.reset(); + inflater.reset(); + } + }; + private final Map dictionaries = new EnumMap<>(DictionaryIndex.class); // Context used for compressing tags TagCompressionContext tagCompressionContext = null; - Deflater valueCompressor = null; + ValueCompressor valueCompressor = null; public CompressionContext(Class dictType, boolean recoveredEdits, boolean hasTagCompression, boolean hasValueCompression) @@ -77,9 +102,7 @@ public CompressionContext(Class dictType, boolean recovere tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE); } if (hasValueCompression) { - valueCompressor = new Deflater(); - // Optimize for encoding speed - valueCompressor.setLevel(Deflater.BEST_SPEED); + valueCompressor = new ValueCompressor(); } } @@ -94,7 +117,7 @@ public Dictionary getDictionary(Enum dictIndex) { return dictionaries.get(dictIndex); } - public Deflater getValueCompressor() { + public ValueCompressor getValueCompressor() { return valueCompressor; } @@ -106,7 +129,7 @@ void clear() { tagCompressionContext.clear(); } if (valueCompressor != null) { - valueCompressor.reset(); + valueCompressor.clear(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 834b68253a99..f80d19d7ca1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.io.IOUtils; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; @@ -251,10 +251,18 @@ public void write(Cell cell) throws IOException { byte type = cell.getTypeByte(); if (compression.getValueCompressor() != null && cell.getValueLength() > VALUE_COMPRESS_THRESHOLD) { - // Set the high bit of type to indicate the value is compressed - out.write((byte)(type|0x80)); - writeCompressedValue(out, cell.getValueArray(), cell.getValueOffset(), - cell.getValueLength(), compression.getValueCompressor()); + // Try compressing the cell's value + byte[] compressedBytes = compressValue(cell); + // Only write the compressed value if we have achieved some space savings. + if (compressedBytes.length < cell.getValueLength()) { + // Set the high bit of type to indicate the value is compressed + out.write((byte)(type|0x80)); + StreamUtils.writeRawVInt32(out, compressedBytes.length); + out.write(compressedBytes); + } else { + out.write(type); + PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); + } } else { out.write(type); PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); @@ -271,25 +279,23 @@ public void write(Cell cell) throws IOException { } } - public static void writeCompressedValue(OutputStream out, byte[] valueArray, int offset, - int vlength, Deflater deflater) throws IOException { + private byte[] compressValue(Cell cell) throws IOException { byte[] buffer = new byte[4096]; ByteArrayOutputStream baos = new ByteArrayOutputStream(); - deflater.reset(); - deflater.setInput(valueArray, offset, vlength); + Deflater deflater = compression.getValueCompressor().getDeflater(); + deflater.setInput(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); boolean finished = false; do { int bytesOut = deflater.deflate(buffer); if (bytesOut > 0) { baos.write(buffer, 0, bytesOut); } else { - bytesOut = deflater.deflate(buffer, 0, buffer.length, Deflater.FULL_FLUSH); + bytesOut = deflater.deflate(buffer, 0, buffer.length, Deflater.SYNC_FLUSH); baos.write(buffer, 0, bytesOut); finished = true; } } while (!finished); - StreamUtils.writeRawVInt32(out, baos.size()); - out.write(baos.toByteArray()); + return baos.toByteArray(); } } @@ -399,18 +405,17 @@ private static void checkLength(int len, int max) throws IOException { } } - public static void readCompressedValue(InputStream in, byte[] outArray, int outOffset, + private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, int expectedLength) throws IOException { int compressedLength = StreamUtils.readRawVarint32(in); byte[] buffer = new byte[compressedLength]; IOUtils.readFully(in, buffer, 0, compressedLength); - Inflater inflater = new Inflater(); + Inflater inflater = compression.getValueCompressor().getInflater(); inflater.setInput(buffer); int remaining = expectedLength; do { try { int inflatedBytes = inflater.inflate(outArray, outOffset, remaining); - Preconditions.checkState(inflatedBytes > 0, "Inflater state error"); outOffset += inflatedBytes; remaining -= inflatedBytes; } catch (DataFormatException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java index a96553b25560..0b3c583e9bb1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.ByteBufferKeyValue; @@ -74,14 +75,20 @@ public void testValueCompressionEnabled() throws Exception { @Test public void testValueCompression() throws Exception { final byte[] row_1 = Bytes.toBytes("row_1"); - final byte[] value_1 = new byte[512]; + final byte[] value_1 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD]; Bytes.zero(value_1); final byte[] row_2 = Bytes.toBytes("row_2"); final byte[] value_2 = new byte[Bytes.SIZEOF_LONG]; Bytes.random(value_2); final byte[] row_3 = Bytes.toBytes("row_3"); - final byte[] value_3 = new byte[1024]; + final byte[] value_3 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD]; Bytes.random(value_3); + final byte[] row_4 = Bytes.toBytes("row_4"); + final byte[] value_4 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD * 4]; + fillBytes(value_4, Bytes.toBytes("DEADBEEF")); + final byte[] row_5 = Bytes.toBytes("row_5"); + final byte[] value_5 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD * 2]; + fillBytes(value_5, Bytes.toBytes("CAFEBABE")); Configuration conf = new Configuration(false); WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, @@ -91,6 +98,8 @@ public void testValueCompression() throws Exception { encoder.write(createKV(row_1, value_1, 0)); encoder.write(createKV(row_2, value_2, 0)); encoder.write(createKV(row_3, value_3, 0)); + encoder.write(createKV(row_4, value_4, 0)); + encoder.write(createKV(row_5, value_5, 0)); encoder.flush(); try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) { @@ -113,55 +122,57 @@ public void testValueCompression() throws Exception { kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); assertTrue(Bytes.equals(value_3, 0, value_3.length, kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + decoder.advance(); + kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_4, 0, row_4.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_4, 0, value_4.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + decoder.advance(); + kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(row_5, 0, row_5.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value_5, 0, value_5.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + } + } + + static void fillBytes(byte[] buffer, byte[] fill) { + int offset = 0; + int remaining = buffer.length; + while (remaining > 0) { + int len = remaining < fill.length ? remaining : fill.length; + System.arraycopy(fill, 0, buffer, offset, len); + offset += len; + remaining -= len; } } @Test public void testValueCompressionCompatibility() throws Exception { - final byte[] row_1 = Bytes.toBytes("row_1"); - final byte[] value_1 = new byte[512]; - Bytes.zero(value_1); - final byte[] row_2 = Bytes.toBytes("row_2"); - final byte[] value_2 = new byte[Bytes.SIZEOF_LONG]; - Bytes.random(value_2); - final byte[] row_3 = Bytes.toBytes("row_3"); - final byte[] value_3 = new byte[1024]; - Bytes.random(value_3); - + final byte[] key = Bytes.toBytes("myRow"); + final byte[] value = Bytes.toBytes("myValue"); Configuration conf = new Configuration(false); WALCellCodec outCodec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false, false, false)); ByteArrayOutputStream bos = new ByteArrayOutputStream(); Encoder encoder = outCodec.getEncoder(bos); - encoder.write(createKV(row_1, value_1, 0)); - encoder.write(createKV(row_2, value_2, 0)); - encoder.write(createKV(row_3, value_3, 0)); + for (int i = 0; i < 10; i++) { + encoder.write(createOffheapKV(key, value, 0)); + } encoder.flush(); - - byte[] encodedCells = bos.toByteArray(); - WALCellCodec inCodec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false, false, true)); - try (InputStream is = new ByteArrayInputStream(encodedCells)) { + try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) { Decoder decoder = inCodec.getDecoder(is); - decoder.advance(); - KeyValue kv = (KeyValue) decoder.current(); - assertTrue(Bytes.equals(row_1, 0, row_1.length, - kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); - assertTrue(Bytes.equals(value_1, 0, value_1.length, - kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); - decoder.advance(); - kv = (KeyValue) decoder.current(); - assertTrue(Bytes.equals(row_2, 0, row_2.length, - kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); - assertTrue(Bytes.equals(value_2, 0, value_2.length, - kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); - decoder.advance(); - kv = (KeyValue) decoder.current(); - assertTrue(Bytes.equals(row_3, 0, row_3.length, - kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); - assertTrue(Bytes.equals(value_3, 0, value_3.length, - kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + for (int i = 0 ; i < 10; i++) { + decoder.advance(); + KeyValue kv = (KeyValue) decoder.current(); + assertTrue(Bytes.equals(key, 0, key.length, + kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); + assertTrue(Bytes.equals(value, 0, value.length, + kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayValueCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayValueCompression.java new file mode 100644 index 000000000000..d10cc9c73540 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayValueCompression.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +/** + * Enables compression and runs the TestWALReplay tests. + */ +@Category({ RegionServerTests.class, LargeTests.class }) +public class TestWALReplayValueCompression extends TestWALReplay { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALReplayValueCompression.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true); + TestWALReplay.setUpBeforeClass(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java new file mode 100644 index 000000000000..fe8a12098a84 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({RegionServerTests.class, LargeTests.class}) +public class TestWALSplitValueCompression extends TestWALSplit { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALSplitValueCompression.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestWALSplit.setUpBeforeClass(); + TEST_UTIL.getConfiguration() + .setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + TEST_UTIL.getConfiguration() + .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true); + } +} From f678ce6fd73fc5269e62d5bae56fea7c0ec4eea4 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 10 May 2021 19:30:18 -0700 Subject: [PATCH 03/12] Address first round of review feedback - Unconditionally compress values if value compression is enabled. Avoids leakage of WALCellCodec details into KeyValue and other places. - Add comments to WALCellCodec. - Add javadoc to CompressionContext. --- .../org/apache/hadoop/hbase/KeyValue.java | 7 -- .../regionserver/wal/CompressionContext.java | 19 ++++- .../hbase/regionserver/wal/WALCellCodec.java | 79 +++++++++++-------- .../wal/TestWALCellCodecWithCompression.java | 8 +- 4 files changed, 65 insertions(+), 48 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 431585153503..5dafa76feb05 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -76,8 +76,6 @@ */ @InterfaceAudience.Private public class KeyValue implements ExtendedCell, Cloneable { - private static final ArrayList EMPTY_ARRAY_LIST = new ArrayList<>(); - private static final Logger LOG = LoggerFactory.getLogger(KeyValue.class); public static final int FIXED_OVERHEAD = ClassSize.OBJECT + // the KeyValue object itself @@ -230,11 +228,6 @@ public static enum Type { DeleteColumn((byte)12), DeleteFamily((byte)14), - // Effective maximum is 127 (Byte.MAX_VALUE). We set the high order bit of the - // type byte in the WAL codecs to indicate, in a backwards compatible way, if the - // value is compressed there. - EffectiveMaximum((byte)Byte.MAX_VALUE), - // Maximum is used when searching; you look from maximum on down. Maximum((byte)255); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index d2a46eccc0c7..6c2d650e4702 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -37,21 +37,26 @@ public class CompressionContext { public static final String ENABLE_WAL_TAGS_COMPRESSION = - "hbase.regionserver.wal.tags.enablecompression"; + "hbase.regionserver.wal.tags.enablecompression"; public static final String ENABLE_WAL_VALUE_COMPRESSION = - "hbase.regionserver.wal.value.enablecompression"; + "hbase.regionserver.wal.value.enablecompression"; public enum DictionaryIndex { REGION, TABLE, FAMILY, QUALIFIER, ROW } + /** + * Encapsulates the zlib deflater/inflater pair we will use for value compression in this WAL. + */ static class ValueCompressor { final Deflater deflater; final Inflater inflater; public ValueCompressor() { deflater = new Deflater(); + // Optimize for speed so we minimize the time spent writing the WAL. This still achieves + // quite good results. (This is not really user serviceable.) deflater.setLevel(Deflater.BEST_SPEED); inflater = new Inflater(); } @@ -113,7 +118,15 @@ public CompressionContext(Class dictType, boolean recovere this(dictType, recoveredEdits, hasTagCompression, false); } - public Dictionary getDictionary(Enum dictIndex) { + public boolean hasTagCompression() { + return tagCompressionContext != null; + } + + public boolean hasValueCompression() { + return valueCompressor != null; + } + + public Dictionary getDictionary(Enum dictIndex) { return dictionaries.get(dictIndex); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index f80d19d7ca1e..49c54e26ee15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -26,6 +26,7 @@ import java.util.zip.Inflater; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ByteBufferExtendedCell; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.PrivateCellUtil; @@ -223,8 +224,6 @@ private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throw } } - static final int VALUE_COMPRESS_THRESHOLD = 100; - static class CompressedKvEncoder extends BaseEncoder { private final CompressionContext compression; public CompressedKvEncoder(OutputStream out, CompressionContext compression) { @@ -249,22 +248,12 @@ public void write(Cell cell) throws IOException { // Write timestamp, type and value. StreamUtils.writeLong(out, cell.getTimestamp()); byte type = cell.getTypeByte(); - if (compression.getValueCompressor() != null && - cell.getValueLength() > VALUE_COMPRESS_THRESHOLD) { - // Try compressing the cell's value + out.write(type); + if (compression.getValueCompressor() != null) { byte[] compressedBytes = compressValue(cell); - // Only write the compressed value if we have achieved some space savings. - if (compressedBytes.length < cell.getValueLength()) { - // Set the high bit of type to indicate the value is compressed - out.write((byte)(type|0x80)); - StreamUtils.writeRawVInt32(out, compressedBytes.length); - out.write(compressedBytes); - } else { - out.write(type); - PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); - } + StreamUtils.writeRawVInt32(out, compressedBytes.length); + out.write(compressedBytes); } else { - out.write(type); PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); } if (tagsLength > 0) { @@ -280,21 +269,38 @@ public void write(Cell cell) throws IOException { } private byte[] compressValue(Cell cell) throws IOException { - byte[] buffer = new byte[4096]; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); Deflater deflater = compression.getValueCompressor().getDeflater(); - deflater.setInput(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - boolean finished = false; + if (cell instanceof ByteBufferExtendedCell) { + deflater.setInput(((ByteBufferExtendedCell)cell).getValueByteBuffer().array(), + ((ByteBufferExtendedCell)cell).getValueByteBuffer().arrayOffset() + + ((ByteBufferExtendedCell)cell).getValuePosition(), + cell.getValueLength()); + } else { + deflater.setInput(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + // Deflater#deflate will return 0 only if more input is required. We iterate until + // that condition is met, sending the content of 'buffer' to the output stream at + // each step, until deflate returns 0. Then the compressor must be flushed in order + // for all of the value's output to be written into the corresponding edit. (Otherwise + // the compressor would carry over some of the output for this value into the output + // of the next.) To flush the compressor we call deflate again using the method option + // that allows us to specify the SYNC_FLUSH flag. The sync output will be placed into + // the buffer. When flushing we iterate until there is no more output. Then the flush + // is complete and the compressor is ready for more input. + int bytesOut; do { - int bytesOut = deflater.deflate(buffer); + bytesOut = deflater.deflate(buffer); if (bytesOut > 0) { baos.write(buffer, 0, bytesOut); - } else { - bytesOut = deflater.deflate(buffer, 0, buffer.length, Deflater.SYNC_FLUSH); - baos.write(buffer, 0, bytesOut); - finished = true; } - } while (!finished); + } while (bytesOut > 0); + // Done compressing value, now flush until deflater buffers are empty + do { + bytesOut = deflater.deflate(buffer, 0, buffer.length, Deflater.SYNC_FLUSH); + baos.write(buffer, 0, bytesOut); + } while (bytesOut == buffer.length); // See javadoc for Deflater#deflate return baos.toByteArray(); } @@ -351,24 +357,20 @@ protected Cell parseCell() throws IOException { if (tagsLength > 0) { typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; } - // high bit of type byte is 1 if value is compressed byte type = (byte)in.read(); - if ((type & 0x80) == 0x80) { - type = (byte)(type & 0x7f); - pos = Bytes.putByte(backingArray, pos, type); - int valLen = typeValLen - 1; + pos = Bytes.putByte(backingArray, pos, type); + int valLen = typeValLen - 1; + if (compression.hasValueCompression()) { readCompressedValue(in, backingArray, pos, valLen); pos += valLen; } else { - pos = Bytes.putByte(backingArray, pos, type); - int valLen = typeValLen - 1; IOUtils.readFully(in, backingArray, pos, valLen); pos += valLen; } // tags if (tagsLength > 0) { pos = Bytes.putAsShort(backingArray, pos, tagsLength); - if (compression.tagCompressionContext != null) { + if (compression.hasTagCompression()) { compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength); } else { IOUtils.readFully(in, backingArray, pos, tagsLength); @@ -407,15 +409,24 @@ private static void checkLength(int len, int max) throws IOException { private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, int expectedLength) throws IOException { + // Read the size of the compressed value. We serialized it as a vint32. int compressedLength = StreamUtils.readRawVarint32(in); + // Read all of the compressed value into a buffer for the Inflater. byte[] buffer = new byte[compressedLength]; IOUtils.readFully(in, buffer, 0, compressedLength); + // Inflate the compressed value. We know the uncompressed size. Inflator#inflate will + // return nonzero for as long as some compressed input remains, and 0 when done. + // We have the advantage of knowing the expected length of the uncompressed value, so + // can stop inflating then. Inflater inflater = compression.getValueCompressor().getInflater(); inflater.setInput(buffer); int remaining = expectedLength; do { try { int inflatedBytes = inflater.inflate(outArray, outOffset, remaining); + if (inflatedBytes == 0) { + throw new IOException("Inflater state error"); + } outOffset += inflatedBytes; remaining -= inflatedBytes; } catch (DataFormatException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java index 0b3c583e9bb1..a3e5e127c10d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java @@ -75,19 +75,19 @@ public void testValueCompressionEnabled() throws Exception { @Test public void testValueCompression() throws Exception { final byte[] row_1 = Bytes.toBytes("row_1"); - final byte[] value_1 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD]; + final byte[] value_1 = new byte[20]; Bytes.zero(value_1); final byte[] row_2 = Bytes.toBytes("row_2"); final byte[] value_2 = new byte[Bytes.SIZEOF_LONG]; Bytes.random(value_2); final byte[] row_3 = Bytes.toBytes("row_3"); - final byte[] value_3 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD]; + final byte[] value_3 = new byte[100]; Bytes.random(value_3); final byte[] row_4 = Bytes.toBytes("row_4"); - final byte[] value_4 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD * 4]; + final byte[] value_4 = new byte[128]; fillBytes(value_4, Bytes.toBytes("DEADBEEF")); final byte[] row_5 = Bytes.toBytes("row_5"); - final byte[] value_5 = new byte[WALCellCodec.VALUE_COMPRESS_THRESHOLD * 2]; + final byte[] value_5 = new byte[64]; fillBytes(value_5, Bytes.toBytes("CAFEBABE")); Configuration conf = new Configuration(false); From 1924a4f16d78821299db3c556ac3d0900b4f5b1c Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 11 May 2021 09:17:02 -0700 Subject: [PATCH 04/12] Remove invalid unit test after latest changes --- .../wal/TestWALCellCodecWithCompression.java | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java index a3e5e127c10d..afc9113adae5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java @@ -148,34 +148,6 @@ static void fillBytes(byte[] buffer, byte[] fill) { } } - @Test - public void testValueCompressionCompatibility() throws Exception { - final byte[] key = Bytes.toBytes("myRow"); - final byte[] value = Bytes.toBytes("myValue"); - Configuration conf = new Configuration(false); - WALCellCodec outCodec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, - false, false, false)); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - Encoder encoder = outCodec.getEncoder(bos); - for (int i = 0; i < 10; i++) { - encoder.write(createOffheapKV(key, value, 0)); - } - encoder.flush(); - WALCellCodec inCodec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, - false, false, true)); - try (InputStream is = new ByteArrayInputStream(bos.toByteArray())) { - Decoder decoder = inCodec.getDecoder(is); - for (int i = 0 ; i < 10; i++) { - decoder.advance(); - KeyValue kv = (KeyValue) decoder.current(); - assertTrue(Bytes.equals(key, 0, key.length, - kv.getRowArray(), kv.getRowOffset(), kv.getRowLength())); - assertTrue(Bytes.equals(value, 0, value.length, - kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); - } - } - } - private void doTest(boolean compressTags, boolean compressValue, boolean offheapKV) throws Exception { final byte[] key = Bytes.toBytes("myRow"); From 7f20cd7cccee129b74289ab89c302781c759de14 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 11 May 2021 09:24:38 -0700 Subject: [PATCH 05/12] WALCellCodec#compressValue should properly handle the case where our buffer may currently be too small to receive deflater flush output. Add more assertions that will throw IOEs if either the deflater or inflater gets into an unexpected state. More unit test coverage. --- .../regionserver/wal/CompressionContext.java | 25 ++++++++ .../hbase/regionserver/wal/WALCellCodec.java | 61 ++++++++++++------- .../TestAsyncWALReplayValueCompression.java | 43 +++++++++++++ .../wal/TestWALCellCodecWithCompression.java | 4 +- .../wal/TestWALSplitValueCompression.java | 2 +- 5 files changed, 111 insertions(+), 24 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayValueCompression.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 6c2d650e4702..41ec0e9a1a2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -50,8 +50,12 @@ public enum DictionaryIndex { * Encapsulates the zlib deflater/inflater pair we will use for value compression in this WAL. */ static class ValueCompressor { + final int DEFAULT_DEFLATE_BUFFER_SIZE = 8*1024; + final int MAX_DEFLATE_BUFFER_SIZE = 256*1024; + final Deflater deflater; final Inflater inflater; + byte[] deflateBuffer; public ValueCompressor() { deflater = new Deflater(); @@ -65,6 +69,25 @@ public Deflater getDeflater() { return deflater; } + public byte[] getDeflateBuffer() { + if (deflateBuffer == null) { + deflateBuffer = new byte[DEFAULT_DEFLATE_BUFFER_SIZE]; + } + return deflateBuffer; + } + + public int getDeflateBufferSize() { + return deflateBuffer.length; + } + + public void setDeflateBufferSize(int size) { + if (size > MAX_DEFLATE_BUFFER_SIZE) { + throw new IllegalArgumentException("Requested buffer size is too large, ask=" + size + + ", max=" + MAX_DEFLATE_BUFFER_SIZE); + } + deflateBuffer = new byte[size]; + } + public Inflater getInflater() { return inflater; } @@ -72,7 +95,9 @@ public Inflater getInflater() { public void clear() { deflater.reset(); inflater.reset(); + deflateBuffer = null; } + }; private final Map dictionaries = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 49c54e26ee15..b0365880b2c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -26,7 +26,6 @@ import java.util.zip.Inflater; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ByteBufferExtendedCell; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.PrivateCellUtil; @@ -43,6 +42,7 @@ import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext.ValueCompressor; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -269,17 +269,10 @@ public void write(Cell cell) throws IOException { } private byte[] compressValue(Cell cell) throws IOException { - Deflater deflater = compression.getValueCompressor().getDeflater(); - if (cell instanceof ByteBufferExtendedCell) { - deflater.setInput(((ByteBufferExtendedCell)cell).getValueByteBuffer().array(), - ((ByteBufferExtendedCell)cell).getValueByteBuffer().arrayOffset() + - ((ByteBufferExtendedCell)cell).getValuePosition(), - cell.getValueLength()); - } else { - deflater.setInput(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - } + ValueCompressor valueCompressor = compression.getValueCompressor(); + Deflater deflater = valueCompressor.getDeflater(); + deflater.setInput(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - byte[] buffer = new byte[1024]; // Deflater#deflate will return 0 only if more input is required. We iterate until // that condition is met, sending the content of 'buffer' to the output stream at // each step, until deflate returns 0. Then the compressor must be flushed in order @@ -291,16 +284,35 @@ private byte[] compressValue(Cell cell) throws IOException { // is complete and the compressor is ready for more input. int bytesOut; do { - bytesOut = deflater.deflate(buffer); + bytesOut = deflater.deflate(valueCompressor.getDeflateBuffer()); if (bytesOut > 0) { - baos.write(buffer, 0, bytesOut); + baos.write(valueCompressor.getDeflateBuffer(), 0, bytesOut); } } while (bytesOut > 0); - // Done compressing value, now flush until deflater buffers are empty + // Done compressing value, now flush until deflater buffers are empty. + // If we don't have enough space in the buffer to fully flush, the buffer must be + // resized. + boolean finished = false; do { - bytesOut = deflater.deflate(buffer, 0, buffer.length, Deflater.SYNC_FLUSH); - baos.write(buffer, 0, bytesOut); - } while (bytesOut == buffer.length); // See javadoc for Deflater#deflate + bytesOut = deflater.deflate(valueCompressor.getDeflateBuffer(), 0, + valueCompressor.getDeflateBufferSize(), Deflater.SYNC_FLUSH); + if (bytesOut == 0) { + throw new IOException("Deflater state error: SYNC_FLUSH did not flush"); + } + if (bytesOut == valueCompressor.getDeflateBufferSize()) { + // Resize the output buffer. + // If we eventually ask for a buffer size that is too large, setDeflateBufferSize + // will throw an IllegalArgumentException. + try { + valueCompressor.setDeflateBufferSize(valueCompressor.getDeflateBufferSize() * 2); + } catch (IllegalArgumentException e) { + throw new IOException("Deflater state error: exceeded max deflate buffer size", e); + } + } else { + baos.write(valueCompressor.getDeflateBuffer(), 0, bytesOut); + finished = true; + } + } while (!finished); return baos.toByteArray(); } @@ -416,23 +428,30 @@ private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, IOUtils.readFully(in, buffer, 0, compressedLength); // Inflate the compressed value. We know the uncompressed size. Inflator#inflate will // return nonzero for as long as some compressed input remains, and 0 when done. - // We have the advantage of knowing the expected length of the uncompressed value, so - // can stop inflating then. Inflater inflater = compression.getValueCompressor().getInflater(); inflater.setInput(buffer); int remaining = expectedLength; + boolean finished = false; do { try { int inflatedBytes = inflater.inflate(outArray, outOffset, remaining); if (inflatedBytes == 0) { - throw new IOException("Inflater state error"); + finished = true; } outOffset += inflatedBytes; remaining -= inflatedBytes; + if (remaining == 0) { + finished = true; + } else if (remaining < 0) { + throw new IOException("Inflater state error: 'remaining' went negative"); + } } catch (DataFormatException e) { throw new IOException(e); } - } while (remaining > 0); + } while (!finished); + if (remaining > 0) { + throw new IOException("Inflater state error: inflator finished early"); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayValueCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayValueCompression.java new file mode 100644 index 000000000000..cbe1faa65d48 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayValueCompression.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestAsyncWALReplayValueCompression extends TestAsyncWALReplay { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncWALReplayValueCompression.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true); + TestAsyncWALReplay.setUpBeforeClass(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java index afc9113adae5..22570310e7f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java @@ -96,9 +96,9 @@ public void testValueCompression() throws Exception { ByteArrayOutputStream bos = new ByteArrayOutputStream(); Encoder encoder = codec.getEncoder(bos); encoder.write(createKV(row_1, value_1, 0)); - encoder.write(createKV(row_2, value_2, 0)); + encoder.write(createOffheapKV(row_2, value_2, 0)); encoder.write(createKV(row_3, value_3, 0)); - encoder.write(createKV(row_4, value_4, 0)); + encoder.write(createOffheapKV(row_4, value_4, 0)); encoder.write(createKV(row_5, value_5, 0)); encoder.flush(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java index fe8a12098a84..32ed85f6bba8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitValueCompression.java @@ -35,10 +35,10 @@ public class TestWALSplitValueCompression extends TestWALSplit { @BeforeClass public static void setUpBeforeClass() throws Exception { - TestWALSplit.setUpBeforeClass(); TEST_UTIL.getConfiguration() .setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); TEST_UTIL.getConfiguration() .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true); + TestWALSplit.setUpBeforeClass(); } } From 86f7a57629aac7420ba4e386fbfd1e7072de06cb Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 11 May 2021 19:11:06 -0700 Subject: [PATCH 06/12] Address another round of review feedback Fix findbugs warnings --- .../regionserver/wal/CompressionContext.java | 4 ++-- .../hbase/regionserver/wal/WALCellCodec.java | 19 +++++++++++++------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 41ec0e9a1a2b..9372819f9b74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -50,8 +50,8 @@ public enum DictionaryIndex { * Encapsulates the zlib deflater/inflater pair we will use for value compression in this WAL. */ static class ValueCompressor { - final int DEFAULT_DEFLATE_BUFFER_SIZE = 8*1024; - final int MAX_DEFLATE_BUFFER_SIZE = 256*1024; + final static int DEFAULT_DEFLATE_BUFFER_SIZE = 8*1024; + final static int MAX_DEFLATE_BUFFER_SIZE = 256*1024; final Deflater deflater; final Inflater inflater; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index b0365880b2c5..5ec2adb8bf98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -226,9 +226,13 @@ private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throw static class CompressedKvEncoder extends BaseEncoder { private final CompressionContext compression; + private final boolean hasValueCompression; + private final boolean hasTagCompression; public CompressedKvEncoder(OutputStream out, CompressionContext compression) { super(out); this.compression = compression; + this.hasValueCompression = compression.hasValueCompression(); + this.hasTagCompression = compression.hasTagCompression(); } @Override @@ -247,9 +251,8 @@ public void write(Cell cell) throws IOException { compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER)); // Write timestamp, type and value. StreamUtils.writeLong(out, cell.getTimestamp()); - byte type = cell.getTypeByte(); - out.write(type); - if (compression.getValueCompressor() != null) { + out.write(cell.getTypeByte()); + if (hasValueCompression) { byte[] compressedBytes = compressValue(cell); StreamUtils.writeRawVInt32(out, compressedBytes.length); out.write(compressedBytes); @@ -257,7 +260,7 @@ public void write(Cell cell) throws IOException { PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); } if (tagsLength > 0) { - if (compression.tagCompressionContext != null) { + if (hasTagCompression) { // Write tags using Dictionary compression PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext); } else { @@ -320,9 +323,13 @@ private byte[] compressValue(Cell cell) throws IOException { static class CompressedKvDecoder extends BaseDecoder { private final CompressionContext compression; + private final boolean hasValueCompression; + private final boolean hasTagCompression; public CompressedKvDecoder(InputStream in, CompressionContext compression) { super(in); this.compression = compression; + this.hasValueCompression = compression.hasValueCompression(); + this.hasTagCompression = compression.hasTagCompression(); } @Override @@ -372,7 +379,7 @@ protected Cell parseCell() throws IOException { byte type = (byte)in.read(); pos = Bytes.putByte(backingArray, pos, type); int valLen = typeValLen - 1; - if (compression.hasValueCompression()) { + if (hasValueCompression) { readCompressedValue(in, backingArray, pos, valLen); pos += valLen; } else { @@ -382,7 +389,7 @@ protected Cell parseCell() throws IOException { // tags if (tagsLength > 0) { pos = Bytes.putAsShort(backingArray, pos, tagsLength); - if (compression.hasTagCompression()) { + if (hasTagCompression) { compression.tagCompressionContext.uncompressTags(in, backingArray, pos, tagsLength); } else { IOUtils.readFully(in, backingArray, pos, tagsLength); From 47b392a98707df09aa4617244be59348c9269278 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Wed, 12 May 2021 16:26:48 -0700 Subject: [PATCH 07/12] Switch to Hadoop compression codecs. - Nicer stream based API. - More options than just Deflate/GZ, especially SNAPPY. Allows the user to select from various points along the speed vs size continuum depending on available native runtime support in their environment. Snappy is strongly preferable if native support is available. --- .../hbase/io/DelegatingInputStream.java | 99 +++++++++++++++ .../src/main/protobuf/server/region/WAL.proto | 1 + .../wal/AbstractProtobufLogWriter.java | 37 ++++-- .../regionserver/wal/CompressionContext.java | 120 +++++++++++------- .../regionserver/wal/ProtobufLogReader.java | 15 +++ .../hbase/regionserver/wal/ReaderBase.java | 15 ++- .../hbase/regionserver/wal/WALCellCodec.java | 95 ++------------ .../wal/TestWALCellCodecWithCompression.java | 16 +-- 8 files changed, 248 insertions(+), 150 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java new file mode 100644 index 000000000000..dbea3ad4a531 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * An input stream that delegates all operations to another input stream. + * The delegate can be switched out for another at any time but to minimize the + * possibility of violating the InputStream contract it would be best to replace + * the delegate only once it has been fully consumed.

For example, a + * ByteArrayInputStream, which is implicitly bounded by the size of the underlying + * byte array can be converted into an unbounded stream fed by multiple instances + * of ByteArrayInputStream, switched out one for the other in sequence. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class DelegatingInputStream extends InputStream { + + InputStream lowerStream; + + public DelegatingInputStream(InputStream lowerStream) { + this.lowerStream = lowerStream; + } + + public InputStream getDelegate() { + return lowerStream; + } + + public void setDelegate(InputStream lowerStream) { + this.lowerStream = lowerStream; + } + + @Override + public int read() throws IOException { + return lowerStream.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return lowerStream.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return lowerStream.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return lowerStream.skip(n); + } + + @Override + public int available() throws IOException { + return lowerStream.available(); + } + + @Override + public void close() throws IOException { + lowerStream.close(); + } + + @Override + public synchronized void mark(int readlimit) { + lowerStream.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + lowerStream.reset(); + } + + @Override + public boolean markSupported() { + return lowerStream.markSupported(); + } + +} diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto index 2add88bda1f0..7ab9721b2568 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto @@ -33,6 +33,7 @@ message WALHeader { optional string writer_cls_name = 4; optional string cell_codec_cls_name = 5; optional bool has_value_compression = 6; + optional uint32 value_compression_codec = 7; } /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index 1e6792158c1b..906593e4d4a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.crypto.Encryptor; @@ -144,10 +145,21 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); if (doCompress) { try { + final boolean useTagCompression = + conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); + final boolean useValueCompression = + conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); + final Compression.Algorithm valueCompressionType = + CompressionContext.getValueCompressionAlgorithm(conf); + if (LOG.isDebugEnabled()) { + LOG.debug("Initializing compression context for {}: isRecoveredEdits={}" + + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", path, + CommonFSUtils.isRecoveredEdits(path), useTagCompression, useValueCompression, + valueCompressionType); + } this.compressionContext = new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path), - conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true), - conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false)); + useTagCompression, useValueCompression, valueCompressionType); } catch (Exception e) { throw new IOException("Failed to initiate CompressionContext", e); } @@ -166,15 +178,20 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita initOutput(fs, path, overwritable, bufferSize, replication, blocksize); - boolean doTagCompress = doCompress - && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); - boolean doValueCompress = doCompress - && conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); + boolean doTagCompress = doCompress && + conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); + boolean doValueCompress = doCompress && + conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); + WALHeader.Builder headerBuilder = WALHeader.newBuilder() + .setHasCompression(doCompress) + .setHasTagCompression(doTagCompress) + .setHasValueCompression(doValueCompress); + if (doValueCompress) { + headerBuilder.setValueCompressionCodec( + CompressionContext.getValueCompressionAlgorithm(conf).ordinal()); + } length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, - buildWALHeader(conf, WALHeader.newBuilder() - .setHasCompression(doCompress) - .setHasTagCompression(doTagCompress) - .setHasValueCompression(doValueCompress)))); + buildWALHeader(conf, headerBuilder))); initAfterHeader(doCompress); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 9372819f9b74..e851cae605be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -18,17 +18,24 @@ package org.apache.hadoop.hbase.regionserver.wal; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.EnumMap; import java.util.Map; -import java.util.zip.Deflater; -import java.util.zip.Inflater; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.io.DelegatingInputStream; import org.apache.hadoop.hbase.io.TagCompressionContext; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.Dictionary; +import org.apache.yetus.audience.InterfaceAudience; /** * Context that holds the various dictionaries for compression in WAL. @@ -42,60 +49,71 @@ public class CompressionContext { public static final String ENABLE_WAL_VALUE_COMPRESSION = "hbase.regionserver.wal.value.enablecompression"; + public static final String WAL_VALUE_COMPRESSION_TYPE = + "hbase.regionserver.wal.value.compression.type"; + public enum DictionaryIndex { REGION, TABLE, FAMILY, QUALIFIER, ROW } /** - * Encapsulates the zlib deflater/inflater pair we will use for value compression in this WAL. + * Encapsulates the compression algorithm and its streams that we will use for value + * compression in this WAL. */ static class ValueCompressor { - final static int DEFAULT_DEFLATE_BUFFER_SIZE = 8*1024; - final static int MAX_DEFLATE_BUFFER_SIZE = 256*1024; - - final Deflater deflater; - final Inflater inflater; - byte[] deflateBuffer; - - public ValueCompressor() { - deflater = new Deflater(); - // Optimize for speed so we minimize the time spent writing the WAL. This still achieves - // quite good results. (This is not really user serviceable.) - deflater.setLevel(Deflater.BEST_SPEED); - inflater = new Inflater(); - } + + static final int IO_BUFFER_SIZE = 4096; - public Deflater getDeflater() { - return deflater; - } + private final Compression.Algorithm algorithm; + private DelegatingInputStream lowerIn; + private ByteArrayOutputStream lowerOut; + private InputStream compressedIn; + private OutputStream compressedOut; - public byte[] getDeflateBuffer() { - if (deflateBuffer == null) { - deflateBuffer = new byte[DEFAULT_DEFLATE_BUFFER_SIZE]; - } - return deflateBuffer; + public ValueCompressor(Compression.Algorithm algorithm) throws IOException { + this.algorithm = algorithm; } - public int getDeflateBufferSize() { - return deflateBuffer.length; + public Compression.Algorithm getAlgorithm() { + return algorithm; } - public void setDeflateBufferSize(int size) { - if (size > MAX_DEFLATE_BUFFER_SIZE) { - throw new IllegalArgumentException("Requested buffer size is too large, ask=" + size + - ", max=" + MAX_DEFLATE_BUFFER_SIZE); + public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) + throws IOException { + // We have to create the output streams here the first time around. + if (compressedOut == null) { + lowerOut = new ByteArrayOutputStream(); + compressedOut = algorithm.createCompressionStream(lowerOut, algorithm.getCompressor(), + IO_BUFFER_SIZE); + } else { + lowerOut.reset(); } - deflateBuffer = new byte[size]; + compressedOut.write(valueArray, valueOffset, valueLength); + compressedOut.flush(); + return lowerOut.toByteArray(); } - public Inflater getInflater() { - return inflater; + public void decompress(InputStream in, int inLength, byte[] outArray, int outOffset, + int outLength) throws IOException { + // Read all of the compressed bytes into a buffer. + byte[] inBuffer = new byte[inLength]; + IOUtils.readFully(in, inBuffer); + // We have to create the input streams here the first time around. + if (compressedIn == null) { + lowerIn = new DelegatingInputStream(new ByteArrayInputStream(inBuffer)); + compressedIn = algorithm.createDecompressionStream(lowerIn, algorithm.getDecompressor(), + IO_BUFFER_SIZE); + } else { + lowerIn.setDelegate(new ByteArrayInputStream(inBuffer)); + } + compressedIn.read(outArray, outOffset, outLength); } public void clear() { - deflater.reset(); - inflater.reset(); - deflateBuffer = null; + lowerIn = null; + compressedIn = null; + lowerOut = null; + compressedOut = null; } }; @@ -106,10 +124,11 @@ public void clear() { TagCompressionContext tagCompressionContext = null; ValueCompressor valueCompressor = null; - public CompressionContext(Class dictType, boolean recoveredEdits, - boolean hasTagCompression, boolean hasValueCompression) + public CompressionContext(Class dictType, + boolean recoveredEdits, boolean hasTagCompression, boolean hasValueCompression, + Compression.Algorithm valueCompressionType) throws SecurityException, NoSuchMethodException, InstantiationException, - IllegalAccessException, InvocationTargetException { + IllegalAccessException, InvocationTargetException, IOException { Constructor dictConstructor = dictType.getConstructor(); for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) { @@ -131,16 +150,16 @@ public CompressionContext(Class dictType, boolean recovere if (hasTagCompression) { tagCompressionContext = new TagCompressionContext(dictType, Short.MAX_VALUE); } - if (hasValueCompression) { - valueCompressor = new ValueCompressor(); + if (hasValueCompression && valueCompressionType != null) { + valueCompressor = new ValueCompressor(valueCompressionType); } } public CompressionContext(Class dictType, boolean recoveredEdits, boolean hasTagCompression) throws SecurityException, NoSuchMethodException, InstantiationException, - IllegalAccessException, InvocationTargetException { - this(dictType, recoveredEdits, hasTagCompression, false); + IllegalAccessException, InvocationTargetException, IOException { + this(dictType, recoveredEdits, hasTagCompression, false, null); } public boolean hasTagCompression() { @@ -171,4 +190,15 @@ void clear() { } } + public static Compression.Algorithm getValueCompressionAlgorithm(Configuration conf) { + if (conf.getBoolean(ENABLE_WAL_VALUE_COMPRESSION, true)) { + String compressionType = conf.get(WAL_VALUE_COMPRESSION_TYPE); + if (compressionType != null) { + return Compression.getCompressionAlgorithmByName(compressionType); + } + return Compression.Algorithm.GZ; + } + return Compression.Algorithm.NONE; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index ce27313a211b..7c78824a169a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -82,6 +83,7 @@ public class ProtobufLogReader extends ReaderBase { protected boolean hasCompression = false; protected boolean hasTagCompression = false; protected boolean hasValueCompression = false; + protected Compression.Algorithm valueCompressionType = null; // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit // entry in the wal, the inputstream's position is equal to walEditsStopOffset. private long walEditsStopOffset; @@ -229,6 +231,14 @@ private String initInternal(FSDataInputStream stream, boolean isFirst) this.hasCompression = header.hasHasCompression() && header.getHasCompression(); this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression(); this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression(); + if (header.hasValueCompressionCodec()) { + try { + this.valueCompressionType = + Compression.Algorithm.values()[header.getValueCompressionCodec()]; + } catch (ArrayIndexOutOfBoundsException e) { + throw new IOException("Invalid compression type", e); + } + } } this.inputStream = stream; this.walEditsStopOffset = this.fileLength; @@ -334,6 +344,11 @@ protected boolean hasValueCompression() { return this.hasValueCompression; } + @Override + protected Compression.Algorithm getValueCompressionType() { + return this.valueCompressionType; + } + @Override protected boolean readNext(Entry entry) throws IOException { // OriginalPosition might be < 0 on local fs; if so, it is useless to us. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index c56e408566fc..b619aa460078 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -68,8 +69,15 @@ public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream // If compression is enabled, new dictionaries are created here. try { if (compressionContext == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Initializing compression context for {}: isRecoveredEdits={}" + + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", path, + CommonFSUtils.isRecoveredEdits(path), hasTagCompression(), hasValueCompression(), + getValueCompressionType()); + } compressionContext = new CompressionContext(LRUDictionary.class, - CommonFSUtils.isRecoveredEdits(path), hasTagCompression(), hasValueCompression()); + CommonFSUtils.isRecoveredEdits(path), hasTagCompression(), + hasValueCompression(), getValueCompressionType()); } else { compressionContext.clear(); } @@ -156,6 +164,11 @@ public void seek(long pos) throws IOException { */ protected abstract boolean hasValueCompression(); + /** + * @return Value compression algorithm for this log. + */ + protected abstract Compression.Algorithm getValueCompressionType(); + /** * Read next entry. * @param e The entry to read into. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 5ec2adb8bf98..d80c0e0ae1dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -21,9 +21,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.zip.DataFormatException; -import java.util.zip.Deflater; -import java.util.zip.Inflater; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -42,7 +39,6 @@ import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.regionserver.wal.CompressionContext.ValueCompressor; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -253,9 +249,7 @@ public void write(Cell cell) throws IOException { StreamUtils.writeLong(out, cell.getTimestamp()); out.write(cell.getTypeByte()); if (hasValueCompression) { - byte[] compressedBytes = compressValue(cell); - StreamUtils.writeRawVInt32(out, compressedBytes.length); - out.write(compressedBytes); + writeCompressedValue(out, cell); } else { PrivateCellUtil.writeValue(out, cell, cell.getValueLength()); } @@ -271,52 +265,11 @@ public void write(Cell cell) throws IOException { } } - private byte[] compressValue(Cell cell) throws IOException { - ValueCompressor valueCompressor = compression.getValueCompressor(); - Deflater deflater = valueCompressor.getDeflater(); - deflater.setInput(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - // Deflater#deflate will return 0 only if more input is required. We iterate until - // that condition is met, sending the content of 'buffer' to the output stream at - // each step, until deflate returns 0. Then the compressor must be flushed in order - // for all of the value's output to be written into the corresponding edit. (Otherwise - // the compressor would carry over some of the output for this value into the output - // of the next.) To flush the compressor we call deflate again using the method option - // that allows us to specify the SYNC_FLUSH flag. The sync output will be placed into - // the buffer. When flushing we iterate until there is no more output. Then the flush - // is complete and the compressor is ready for more input. - int bytesOut; - do { - bytesOut = deflater.deflate(valueCompressor.getDeflateBuffer()); - if (bytesOut > 0) { - baos.write(valueCompressor.getDeflateBuffer(), 0, bytesOut); - } - } while (bytesOut > 0); - // Done compressing value, now flush until deflater buffers are empty. - // If we don't have enough space in the buffer to fully flush, the buffer must be - // resized. - boolean finished = false; - do { - bytesOut = deflater.deflate(valueCompressor.getDeflateBuffer(), 0, - valueCompressor.getDeflateBufferSize(), Deflater.SYNC_FLUSH); - if (bytesOut == 0) { - throw new IOException("Deflater state error: SYNC_FLUSH did not flush"); - } - if (bytesOut == valueCompressor.getDeflateBufferSize()) { - // Resize the output buffer. - // If we eventually ask for a buffer size that is too large, setDeflateBufferSize - // will throw an IllegalArgumentException. - try { - valueCompressor.setDeflateBufferSize(valueCompressor.getDeflateBufferSize() * 2); - } catch (IllegalArgumentException e) { - throw new IOException("Deflater state error: exceeded max deflate buffer size", e); - } - } else { - baos.write(valueCompressor.getDeflateBuffer(), 0, bytesOut); - finished = true; - } - } while (!finished); - return baos.toByteArray(); + private void writeCompressedValue(OutputStream out, Cell cell) throws IOException { + byte[] compressed = compression.getValueCompressor().compress(cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength()); + StreamUtils.writeRawVInt32(out, compressed.length); + out.write(compressed); } } @@ -376,8 +329,7 @@ protected Cell parseCell() throws IOException { if (tagsLength > 0) { typeValLen = typeValLen - tagsLength - KeyValue.TAGS_LENGTH_SIZE; } - byte type = (byte)in.read(); - pos = Bytes.putByte(backingArray, pos, type); + pos = Bytes.putByte(backingArray, pos, (byte)in.read()); int valLen = typeValLen - 1; if (hasValueCompression) { readCompressedValue(in, backingArray, pos, valLen); @@ -428,37 +380,8 @@ private static void checkLength(int len, int max) throws IOException { private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, int expectedLength) throws IOException { - // Read the size of the compressed value. We serialized it as a vint32. - int compressedLength = StreamUtils.readRawVarint32(in); - // Read all of the compressed value into a buffer for the Inflater. - byte[] buffer = new byte[compressedLength]; - IOUtils.readFully(in, buffer, 0, compressedLength); - // Inflate the compressed value. We know the uncompressed size. Inflator#inflate will - // return nonzero for as long as some compressed input remains, and 0 when done. - Inflater inflater = compression.getValueCompressor().getInflater(); - inflater.setInput(buffer); - int remaining = expectedLength; - boolean finished = false; - do { - try { - int inflatedBytes = inflater.inflate(outArray, outOffset, remaining); - if (inflatedBytes == 0) { - finished = true; - } - outOffset += inflatedBytes; - remaining -= inflatedBytes; - if (remaining == 0) { - finished = true; - } else if (remaining < 0) { - throw new IOException("Inflater state error: 'remaining' went negative"); - } - } catch (DataFormatException e) { - throw new IOException(e); - } - } while (!finished); - if (remaining > 0) { - throw new IOException("Inflater state error: inflator finished early"); - } + int compressedLen = StreamUtils.readRawVarint32(in); + compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset, expectedLength); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java index 22570310e7f8..4e091ca34964 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.codec.Codec.Decoder; import org.apache.hadoop.hbase.codec.Codec.Encoder; +import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -54,22 +55,22 @@ public class TestWALCellCodecWithCompression { @Test public void testEncodeDecodeKVsWithTags() throws Exception { - doTest(false, false, false); + doTest(false, false); } @Test public void testEncodeDecodeKVsWithTagsWithTagsCompression() throws Exception { - doTest(true, false, false); + doTest(true, false); } @Test public void testEncodeDecodeOffKVsWithTagsWithTagsCompression() throws Exception { - doTest(true, false, true); + doTest(true, false); } @Test public void testValueCompressionEnabled() throws Exception { - doTest(false, true, false); + doTest(false, true); } @Test @@ -92,7 +93,7 @@ public void testValueCompression() throws Exception { Configuration conf = new Configuration(false); WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, - false, true, true)); + false, true, true, Compression.Algorithm.GZ)); ByteArrayOutputStream bos = new ByteArrayOutputStream(); Encoder encoder = codec.getEncoder(bos); encoder.write(createKV(row_1, value_1, 0)); @@ -148,15 +149,14 @@ static void fillBytes(byte[] buffer, byte[] fill) { } } - private void doTest(boolean compressTags, boolean compressValue, boolean offheapKV) + private void doTest(boolean compressTags, boolean offheapKV) throws Exception { final byte[] key = Bytes.toBytes("myRow"); final byte[] value = Bytes.toBytes("myValue"); Configuration conf = new Configuration(false); conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags); - conf.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, compressValue); WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, - false, compressTags, compressValue)); + false, compressTags)); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); Encoder encoder = codec.getEncoder(bos); if (offheapKV) { From 80d346a5245083f626a1301420edb7954d1e5de6 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 14 May 2021 16:19:33 -0700 Subject: [PATCH 08/12] Fix findbugs warning in CompressionContext --- .../hadoop/hbase/regionserver/wal/CompressionContext.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index e851cae605be..c15c833b943d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -93,7 +93,7 @@ public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) return lowerOut.toByteArray(); } - public void decompress(InputStream in, int inLength, byte[] outArray, int outOffset, + public int decompress(InputStream in, int inLength, byte[] outArray, int outOffset, int outLength) throws IOException { // Read all of the compressed bytes into a buffer. byte[] inBuffer = new byte[inLength]; @@ -106,7 +106,7 @@ public void decompress(InputStream in, int inLength, byte[] outArray, int outOff } else { lowerIn.setDelegate(new ByteArrayInputStream(inBuffer)); } - compressedIn.read(outArray, outOffset, outLength); + return compressedIn.read(outArray, outOffset, outLength); } public void clear() { From b032da3d337167b59a3c21c2a79ccdf066dfaaf2 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 18 May 2021 18:12:35 -0700 Subject: [PATCH 09/12] Address final round of review feedback Add additional unit test TestCompressedWAL --- .../hbase/io/DelegatingInputStream.java | 33 ++-- .../src/main/protobuf/server/region/WAL.proto | 2 +- .../wal/AbstractProtobufLogWriter.java | 13 +- .../regionserver/wal/CompressionContext.java | 69 ++++++-- .../regionserver/wal/ProtobufLogReader.java | 13 +- .../hbase/regionserver/wal/ReaderBase.java | 6 +- .../hbase/regionserver/wal/WALCellCodec.java | 6 +- .../hadoop/hbase/wal/TestCompressedWAL.java | 159 ++++++++++++++++++ 8 files changed, 261 insertions(+), 40 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java index dbea3ad4a531..2bd5266f28ca 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java @@ -20,9 +20,9 @@ import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.atomic.AtomicReference; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; /** * An input stream that delegates all operations to another input stream. @@ -32,68 +32,71 @@ * ByteArrayInputStream, which is implicitly bounded by the size of the underlying * byte array can be converted into an unbounded stream fed by multiple instances * of ByteArrayInputStream, switched out one for the other in sequence. + *

+ * Although multithreaded access is allowed, users of this class will want to take + * care to order operations on this stream and the swap out of one delegate for + * another in a way that provides a valid view of stream contents. */ @InterfaceAudience.Private -@InterfaceStability.Evolving public class DelegatingInputStream extends InputStream { - InputStream lowerStream; + final AtomicReference lowerStream = new AtomicReference<>(); public DelegatingInputStream(InputStream lowerStream) { - this.lowerStream = lowerStream; + this.lowerStream.set(lowerStream); } public InputStream getDelegate() { - return lowerStream; + return lowerStream.get(); } public void setDelegate(InputStream lowerStream) { - this.lowerStream = lowerStream; + this.lowerStream.set(lowerStream); } @Override public int read() throws IOException { - return lowerStream.read(); + return lowerStream.get().read(); } @Override public int read(byte[] b) throws IOException { - return lowerStream.read(b); + return lowerStream.get().read(b); } @Override public int read(byte[] b, int off, int len) throws IOException { - return lowerStream.read(b, off, len); + return lowerStream.get().read(b, off, len); } @Override public long skip(long n) throws IOException { - return lowerStream.skip(n); + return lowerStream.get().skip(n); } @Override public int available() throws IOException { - return lowerStream.available(); + return lowerStream.get().available(); } @Override public void close() throws IOException { - lowerStream.close(); + lowerStream.get().close(); } @Override public synchronized void mark(int readlimit) { - lowerStream.mark(readlimit); + lowerStream.get().mark(readlimit); } @Override public synchronized void reset() throws IOException { - lowerStream.reset(); + lowerStream.get().reset(); } @Override public boolean markSupported() { - return lowerStream.markSupported(); + return lowerStream.get().markSupported(); } } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto index 7ab9721b2568..48a108bb8a79 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto @@ -33,7 +33,7 @@ message WALHeader { optional string writer_cls_name = 4; optional string cell_codec_cls_name = 5; optional bool has_value_compression = 6; - optional uint32 value_compression_codec = 7; + optional uint32 value_compression_algorithm = 7; } /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index 906593e4d4a0..3b84488d1dc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -150,9 +150,10 @@ private boolean initializeCompressionContext(Configuration conf, Path path) thro final boolean useValueCompression = conf.getBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); final Compression.Algorithm valueCompressionType = - CompressionContext.getValueCompressionAlgorithm(conf); - if (LOG.isDebugEnabled()) { - LOG.debug("Initializing compression context for {}: isRecoveredEdits={}" + + useValueCompression ? CompressionContext.getValueCompressionAlgorithm(conf) : + Compression.Algorithm.NONE; + if (LOG.isTraceEnabled()) { + LOG.trace("Initializing compression context for {}: isRecoveredEdits={}" + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", path, CommonFSUtils.isRecoveredEdits(path), useTagCompression, useValueCompression, valueCompressionType); @@ -187,7 +188,7 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita .setHasTagCompression(doTagCompress) .setHasValueCompression(doValueCompress); if (doValueCompress) { - headerBuilder.setValueCompressionCodec( + headerBuilder.setValueCompressionAlgorithm( CompressionContext.getValueCompressionAlgorithm(conf).ordinal()); } length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, @@ -197,8 +198,10 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita // instantiate trailer to default value. trailer = WALTrailer.newBuilder().build(); + if (LOG.isTraceEnabled()) { - LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress); + LOG.trace("Initialized protobuf WAL={}, compression={}, tagCompression={}" + + ", valueCompression={}", path, doCompress, doTagCompress, doValueCompress); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index c15c833b943d..0b740d4c01e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -27,7 +27,6 @@ import java.lang.reflect.InvocationTargetException; import java.util.EnumMap; import java.util.Map; - import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -36,13 +35,20 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.Dictionary; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Context that holds the various dictionaries for compression in WAL. + *

+ * CompressionContexts are not expected to be shared among threads. Multithreaded use may + * produce unexpected results. */ @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) public class CompressionContext { + private static final Logger LOG = LoggerFactory.getLogger(CompressionContext.class); + public static final String ENABLE_WAL_TAGS_COMPRESSION = "hbase.regionserver.wal.tags.enablecompression"; @@ -70,7 +76,7 @@ static class ValueCompressor { private InputStream compressedIn; private OutputStream compressedOut; - public ValueCompressor(Compression.Algorithm algorithm) throws IOException { + public ValueCompressor(Compression.Algorithm algorithm) { this.algorithm = algorithm; } @@ -80,8 +86,8 @@ public Compression.Algorithm getAlgorithm() { public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) throws IOException { - // We have to create the output streams here the first time around. if (compressedOut == null) { + // Create the output streams here the first time around. lowerOut = new ByteArrayOutputStream(); compressedOut = algorithm.createCompressionStream(lowerOut, algorithm.getCompressor(), IO_BUFFER_SIZE); @@ -95,10 +101,22 @@ public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) public int decompress(InputStream in, int inLength, byte[] outArray, int outOffset, int outLength) throws IOException { - // Read all of the compressed bytes into a buffer. + + // We handle input as a sequence of byte[] arrays (call them segments), with + // DelegatingInputStream providing a way to switch in a new segment, wrapped in a + // ByteArrayInputStream, when the old segment has been fully consumed. + + // Originally I looked at using BoundedInputStream but you can't reuse/reset the + // BIS instance, and we can't just create new streams each time around because + // that would reset compression codec state, which must accumulate over all values + // in the file in order to build the dictionary in the same way as the compressor + // did. + + // Read in all of the next segment of compressed bytes to process. byte[] inBuffer = new byte[inLength]; IOUtils.readFully(in, inBuffer); - // We have to create the input streams here the first time around. + + // Create the input streams here the first time around. if (compressedIn == null) { lowerIn = new DelegatingInputStream(new ByteArrayInputStream(inBuffer)); compressedIn = algorithm.createDecompressionStream(lowerIn, algorithm.getDecompressor(), @@ -106,17 +124,48 @@ public int decompress(InputStream in, int inLength, byte[] outArray, int outOffs } else { lowerIn.setDelegate(new ByteArrayInputStream(inBuffer)); } + + // Caller must handle short reads. With current Hadoop compression codecs all 'outLength' + // bytes are read in here, so not an issue now. return compressedIn.read(outArray, outOffset, outLength); } public void clear() { - lowerIn = null; - compressedIn = null; - lowerOut = null; + if (compressedOut != null) { + try { + compressedOut.close(); + } catch (IOException e) { + LOG.warn("Exception closing compressed output stream", e); + } + } compressedOut = null; + if (lowerOut != null) { + try { + lowerOut.close(); + } catch (IOException e) { + LOG.warn("Exception closing lower output stream", e); + } + } + lowerOut = null; + if (compressedIn != null) { + try { + compressedIn.close(); + } catch (IOException e) { + LOG.warn("Exception closing compressed input stream", e); + } + } + compressedIn = null; + if (lowerIn != null) { + try { + lowerIn.close(); + } catch (IOException e) { + LOG.warn("Exception closing lower input stream", e); + } + } + lowerIn = null; } - }; + } private final Map dictionaries = new EnumMap<>(DictionaryIndex.class); @@ -170,7 +219,7 @@ public boolean hasValueCompression() { return valueCompressor != null; } - public Dictionary getDictionary(Enum dictIndex) { + public Dictionary getDictionary(Enum dictIndex) { return dictionaries.get(dictIndex); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 7c78824a169a..c86dd4d130bd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -230,11 +230,12 @@ private String initInternal(FSDataInputStream stream, boolean isFirst) WALProtos.WALHeader header = builder.build(); this.hasCompression = header.hasHasCompression() && header.getHasCompression(); this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression(); - this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression(); - if (header.hasValueCompressionCodec()) { + this.hasValueCompression = header.hasHasValueCompression() && + header.getHasValueCompression(); + if (header.hasValueCompressionAlgorithm()) { try { this.valueCompressionType = - Compression.Algorithm.values()[header.getValueCompressionCodec()]; + Compression.Algorithm.values()[header.getValueCompressionAlgorithm()]; } catch (ArrayIndexOutOfBoundsException e) { throw new IOException("Invalid compression type", e); } @@ -247,7 +248,9 @@ private String initInternal(FSDataInputStream stream, boolean isFirst) this.seekOnFs(currentPosition); if (LOG.isTraceEnabled()) { LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset - + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + ", currentPosition: " + currentPosition); + + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false") + + ", currentPosition: " + currentPosition); } codecClsName = hdrCtxt.getCellCodecClsName(); @@ -345,7 +348,7 @@ protected boolean hasValueCompression() { } @Override - protected Compression.Algorithm getValueCompressionType() { + protected Compression.Algorithm getValueCompressionAlgorithm() { return this.valueCompressionType; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index b619aa460078..90a1653a5140 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -73,11 +73,11 @@ public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream LOG.debug("Initializing compression context for {}: isRecoveredEdits={}" + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}", path, CommonFSUtils.isRecoveredEdits(path), hasTagCompression(), hasValueCompression(), - getValueCompressionType()); + getValueCompressionAlgorithm()); } compressionContext = new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path), hasTagCompression(), - hasValueCompression(), getValueCompressionType()); + hasValueCompression(), getValueCompressionAlgorithm()); } else { compressionContext.clear(); } @@ -167,7 +167,7 @@ public void seek(long pos) throws IOException { /** * @return Value compression algorithm for this log. */ - protected abstract Compression.Algorithm getValueCompressionType(); + protected abstract Compression.Algorithm getValueCompressionAlgorithm(); /** * Read next entry. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index d80c0e0ae1dc..31eccc7a18af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -381,7 +381,11 @@ private static void checkLength(int len, int max) throws IOException { private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, int expectedLength) throws IOException { int compressedLen = StreamUtils.readRawVarint32(in); - compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset, expectedLength); + int read = compression.getValueCompressor().decompress(in, compressedLen, outArray, + outOffset, expectedLength); + if (read != expectedLength) { + throw new IOException("ValueCompressor state error: short read"); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java new file mode 100644 index 000000000000..6df0d1d20ea2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestCompressedWAL.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestCompressedWAL { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCompressedWAL.class); + + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Rule + public TestName name = new TestName(); + + @Parameter + public String walProvider; + + @Parameters(name = "{index}: provider={0}") + public static Iterable data() { + return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" }); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDirOnTestFS()); + TEST_UTIL.startMiniDFSCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() { + TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider); + TEST_UTIL.getConfiguration() + .setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + } + + @Test + public void testCompressedWAL() throws Exception { + TEST_UTIL.getConfiguration() + .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, false); + doTest(); + } + + @Test + public void testCompressedWALWithValueCompression() throws Exception { + TEST_UTIL.getConfiguration() + .setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true); + doTest(); + } + + private void doTest() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_")); + NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); + scopes.put(tableName.getName(), 0); + RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); + final int total = 1000; + final byte[] row = Bytes.toBytes("row"); + final byte[] family = Bytes.toBytes("family"); + final byte[] value = Bytes.toBytes("Test value"); + FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); + final WALFactory wals = + new WALFactory(TEST_UTIL.getConfiguration(), tableName.getNameAsString()); + + // Write the WAL + final WAL wal = wals.getWAL(regionInfo); + + MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + + for (int i = 0; i < total; i++) { + WALEdit kvs = new WALEdit(); + kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); + wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), kvs); + } + wal.sync(); + final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); + wals.shutdown(); + + // Confirm the WAL can be read back + WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath); + int count = 0; + WAL.Entry entry = new WAL.Entry(); + while (reader.next(entry) != null) { + count++; + List cells = entry.getEdit().getCells(); + assertTrue("Should be one KV per WALEdit", cells.size() == 1); + for (Cell cell: cells) { + assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), row, 0, row.length)); + assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), family, 0, family.length)); + assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength(), value, 0, value.length)); + } + } + assertEquals("Should have read back as many KVs as written", total, count); + reader.close(); + } +} From a0a338400d1d7b745a82891ca5bd8fbd9c2fb232 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Wed, 19 May 2021 12:33:36 -0700 Subject: [PATCH 10/12] Extend DelegatingInputStream from FilterInputStream --- .../hbase/io/DelegatingInputStream.java | 62 +++---------------- 1 file changed, 7 insertions(+), 55 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java index 2bd5266f28ca..0c551e1b933d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java @@ -18,9 +18,8 @@ package org.apache.hadoop.hbase.io; -import java.io.IOException; +import java.io.FilterInputStream; import java.io.InputStream; -import java.util.concurrent.atomic.AtomicReference; import org.apache.yetus.audience.InterfaceAudience; @@ -38,65 +37,18 @@ * another in a way that provides a valid view of stream contents. */ @InterfaceAudience.Private -public class DelegatingInputStream extends InputStream { +public class DelegatingInputStream extends FilterInputStream { - final AtomicReference lowerStream = new AtomicReference<>(); - - public DelegatingInputStream(InputStream lowerStream) { - this.lowerStream.set(lowerStream); + public DelegatingInputStream(InputStream in) { + super(in); } public InputStream getDelegate() { - return lowerStream.get(); - } - - public void setDelegate(InputStream lowerStream) { - this.lowerStream.set(lowerStream); - } - - @Override - public int read() throws IOException { - return lowerStream.get().read(); - } - - @Override - public int read(byte[] b) throws IOException { - return lowerStream.get().read(b); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return lowerStream.get().read(b, off, len); - } - - @Override - public long skip(long n) throws IOException { - return lowerStream.get().skip(n); - } - - @Override - public int available() throws IOException { - return lowerStream.get().available(); - } - - @Override - public void close() throws IOException { - lowerStream.get().close(); - } - - @Override - public synchronized void mark(int readlimit) { - lowerStream.get().mark(readlimit); - } - - @Override - public synchronized void reset() throws IOException { - lowerStream.get().reset(); + return this.in; } - @Override - public boolean markSupported() { - return lowerStream.get().markSupported(); + public void setDelegate(InputStream in) { + this.in = in; } } From f2202acae072bedab051f4f9374a62d2647dba66 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Wed, 19 May 2021 13:32:44 -0700 Subject: [PATCH 11/12] Avoid a copy during decompression with new BoundedDelegatingInputStream. --- .../io/BoundedDelegatingInputStream.java | 111 ++++++++++++++++++ .../hbase/io/DelegatingInputStream.java | 2 +- .../regionserver/wal/CompressionContext.java | 31 ++--- 3 files changed, 122 insertions(+), 22 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java new file mode 100644 index 000000000000..e9a3b67bd081 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * This is a stream that will only supply bytes from its delegate up to a certain limit. + * When there is an attempt to set the position beyond that it will signal that the input + * is finished. + */ +@InterfaceAudience.Private +public class BoundedDelegatingInputStream extends DelegatingInputStream { + + protected long limit; + protected long pos; + + public BoundedDelegatingInputStream(InputStream in, long limit) { + super(in); + this.limit = limit; + this.pos = 0; + } + + public void setDelegate(InputStream in, long limit) { + this.in = in; + this.limit = limit; + this.pos = 0; + } + + /** + * Call the delegate's {@code read()} method if the current position is less than the limit. + * @return the byte read or -1 if the end of stream or the limit has been reached. + */ + @Override + public int read() throws IOException { + if (pos >= limit) { + return -1; + } + int result = in.read(); + pos++; + return result; + } + + /** + * Call the delegate's {@code read(byte[], int, int)} method if the current position is less + * than the limit. + * @param b read buffer + * @param off Start offset + * @param len The number of bytes to read + * @return the number of bytes read or -1 if the end of stream or the limit has been reached. + */ + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + if (pos >= limit) { + return -1; + } + long readLen = Math.min(len, limit - pos); + int read = in.read(b, off, (int)readLen); + if (read < 0) { + return -1; + } + pos += read; + return read; + } + + /** + * Call the delegate's {@code skip(long)} method. + * @param len the number of bytes to skip + * @return the actual number of bytes skipped + */ + @Override + public long skip(final long len) throws IOException { + long skipped = in.skip(Math.min(len, limit - pos)); + pos += skipped; + return skipped; + } + + /** + * Call the delegate's {@code available()} method. + * @return the delegate's available bytes if the current position is less than the limit, + * or 0 otherwise + */ + @Override + public int available() throws IOException { + if (pos >= limit) { + return 0; + } + int available = in.available(); + return (int) Math.min(available, limit - pos); + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java index 0c551e1b933d..6bd82ae10641 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/DelegatingInputStream.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 0b740d4c01e1..292ff62a6137 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.regionserver.wal; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -27,10 +26,9 @@ import java.lang.reflect.InvocationTargetException; import java.util.EnumMap; import java.util.Map; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.io.DelegatingInputStream; +import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.util.Dictionary; @@ -71,7 +69,7 @@ static class ValueCompressor { static final int IO_BUFFER_SIZE = 4096; private final Compression.Algorithm algorithm; - private DelegatingInputStream lowerIn; + private BoundedDelegatingInputStream lowerIn; private ByteArrayOutputStream lowerOut; private InputStream compressedIn; private OutputStream compressedOut; @@ -102,31 +100,22 @@ public byte[] compress(byte[] valueArray, int valueOffset, int valueLength) public int decompress(InputStream in, int inLength, byte[] outArray, int outOffset, int outLength) throws IOException { - // We handle input as a sequence of byte[] arrays (call them segments), with - // DelegatingInputStream providing a way to switch in a new segment, wrapped in a - // ByteArrayInputStream, when the old segment has been fully consumed. - - // Originally I looked at using BoundedInputStream but you can't reuse/reset the - // BIS instance, and we can't just create new streams each time around because - // that would reset compression codec state, which must accumulate over all values - // in the file in order to build the dictionary in the same way as the compressor - // did. - - // Read in all of the next segment of compressed bytes to process. - byte[] inBuffer = new byte[inLength]; - IOUtils.readFully(in, inBuffer); + // Our input is a sequence of bounded byte ranges (call them segments), with + // BoundedDelegatingInputStream providing a way to switch in a new segment when the + // previous segment has been fully consumed. // Create the input streams here the first time around. if (compressedIn == null) { - lowerIn = new DelegatingInputStream(new ByteArrayInputStream(inBuffer)); + lowerIn = new BoundedDelegatingInputStream(in, inLength); compressedIn = algorithm.createDecompressionStream(lowerIn, algorithm.getDecompressor(), IO_BUFFER_SIZE); } else { - lowerIn.setDelegate(new ByteArrayInputStream(inBuffer)); + lowerIn.setDelegate(in, inLength); } - // Caller must handle short reads. With current Hadoop compression codecs all 'outLength' - // bytes are read in here, so not an issue now. + // Caller must handle short reads. + // With current Hadoop compression codecs all 'outLength' bytes are read in here, so not + // an issue for now. return compressedIn.read(outArray, outOffset, outLength); } From e4f8f7f3921579aac9a4dd33bd9d0b47f232cce8 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 20 May 2021 12:35:23 -0700 Subject: [PATCH 12/12] Fix whitespace and javadoc formatting nits in latest precommit report --- .../apache/hadoop/hbase/io/BoundedDelegatingInputStream.java | 4 ++-- .../hadoop/hbase/regionserver/wal/CompressionContext.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java index e9a3b67bd081..c7002114099b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java @@ -96,8 +96,8 @@ public long skip(final long len) throws IOException { /** * Call the delegate's {@code available()} method. - * @return the delegate's available bytes if the current position is less than the limit, - * or 0 otherwise + * @return the delegate's available bytes if the current position is less than the + * limit, or 0 otherwise. */ @Override public int available() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java index 292ff62a6137..82bad934393c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java @@ -65,7 +65,7 @@ public enum DictionaryIndex { * compression in this WAL. */ static class ValueCompressor { - + static final int IO_BUFFER_SIZE = 4096; private final Compression.Algorithm algorithm; @@ -101,7 +101,7 @@ public int decompress(InputStream in, int inLength, byte[] outArray, int outOffs int outLength) throws IOException { // Our input is a sequence of bounded byte ranges (call them segments), with - // BoundedDelegatingInputStream providing a way to switch in a new segment when the + // BoundedDelegatingInputStream providing a way to switch in a new segment when the // previous segment has been fully consumed. // Create the input streams here the first time around.