From fd21d8b4f3dad5706d6335180fcbefe2676f545e Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 10 Jun 2021 10:14:57 -0700 Subject: [PATCH 1/6] HBASE-25994 Active WAL tailing fails when WAL value compression is enabled Depending on which compression codec is used, a short read of the compressed bytes can cause catastrophic errors that confuse the WAL reader. This problem can manifest when the reader is actively tailing the WAL for replication. The input stream's available() method sometimes lies so cannot be relied upon. To avoid these issues when WAL value compression is enabled ensure all bytes of the compressed value are read in and thus available before submitting the payload to the decompressor. Add TestReplicationCompressedWAL and TestReplicationValueCompressedWAL. Without the WALCellCodec change TestReplicationValueCompressedWAL will fail. --- .../hbase/regionserver/wal/WALCellCodec.java | 18 ++- .../TestReplicationCompressedWAL.java | 108 ++++++++++++++++++ .../TestReplicationValueCompressedWAL.java | 60 ++++++++++ 3 files changed, 182 insertions(+), 4 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationValueCompressedWAL.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 31eccc7a18af..0baf75d5a617 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; @@ -258,11 +260,14 @@ public void write(Cell cell) throws IOException { // Write tags using Dictionary compression PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext); } else { - // Tag compression is disabled within the WAL compression. Just write the tags bytes as - // it is. + // Tag compression is disabled within the WAL compression. Just write the tags bytes + // as it is. PrivateCellUtil.writeTags(out, cell, tagsLength); } } + // Flush the output stream now to minimize the chance an reader actively tailing the + // WAL will encounter a short read. + out.flush(); } private void writeCompressedValue(OutputStream out, Cell cell) throws IOException { @@ -381,8 +386,13 @@ private static void checkLength(int len, int max) throws IOException { private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, int expectedLength) throws IOException { int compressedLen = StreamUtils.readRawVarint32(in); - int read = compression.getValueCompressor().decompress(in, compressedLen, outArray, - outOffset, expectedLength); + // A partial read of the compressed bytes, depending on which compression codec is used, + // can cause messy IO errors. This can happen when the reader is actively tailing a file + // being written, for replication. + byte[] buffer = new byte[compressedLen]; + IOUtils.readFully(in, buffer, 0, compressedLen); + int read = compression.getValueCompressor().decompress(new ByteArrayInputStream(buffer), + compressedLen, outArray, outOffset, expectedLength); if (read != expectedLength) { throw new IOException("ValueCompressor state error: short read"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java new file mode 100644 index 000000000000..e76603ea8bb4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(MediumTests.class) +public class TestReplicationCompressedWAL extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationCompressedWAL.class); + + static final Logger LOG = LoggerFactory.getLogger(TestReplicationCompressedWAL.class); + static final int NUM_BATCHES = 100; + static final int NUM_ROWS_PER_BATCH = 100; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + TestReplicationBase.setUpBeforeClass(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TestReplicationBase.tearDownAfterClass(); + } + + @Test + public void testMultiplePuts() throws Exception { + runMultiplePutTest(); + } + + protected static void runMultiplePutTest() throws IOException, InterruptedException { + for (int i = 0; i < NUM_BATCHES; i++) { + putBatch(i); + getBatch(i); + } + } + + protected static void getBatch(int batch) throws IOException, InterruptedException { + for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) { + byte[] row = getRowKey(batch, i); + Get get = new Get(row); + for (int j = 0; j < NB_RETRIES; j++) { + if (j == NB_RETRIES - 1) { + fail("Waited too much time for replication"); + } + Result res = htable2.get(get); + if (res.isEmpty()) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(row, res.value()); + break; + } + } + } + } + + protected static byte[] getRowKey(int batch, int count) { + return Bytes.toBytes("row" + ((batch * NUM_ROWS_PER_BATCH) + count)); + } + + protected static void putBatch(int batch) throws IOException { + for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) { + byte[] row = getRowKey(batch, i); + Put put = new Put(row); + put.addColumn(famName, row, row); + htable1.put(put); + } + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationValueCompressedWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationValueCompressedWAL.java new file mode 100644 index 000000000000..cba645533dfd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationValueCompressedWAL.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(MediumTests.class) +public class TestReplicationValueCompressedWAL extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationValueCompressedWAL.class); + + static final Logger LOG = LoggerFactory.getLogger(TestReplicationValueCompressedWAL.class); + static final int NUM_ROWS = 10; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + CONF1.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true); + TestReplicationBase.setUpBeforeClass(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TestReplicationBase.tearDownAfterClass(); + } + + @Test + public void testMultiplePuts() throws Exception { + TestReplicationCompressedWAL.runMultiplePutTest(); + } + +} From c2a3a90172a0f1d0180631e1b86196e550b6a83c Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 10 Jun 2021 12:59:02 -0700 Subject: [PATCH 2/6] Remove unused import --- .../org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 0baf75d5a617..d5f0cbf0f01b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.InterruptedIOException; import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; From 380a7e71c5ac197766417ad3904aa4e89d41253f Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Thu, 10 Jun 2021 13:35:08 -0700 Subject: [PATCH 3/6] Reduce test running times so the MediumTest annotation is proper --- .../replication/regionserver/TestReplicationCompressedWAL.java | 2 +- .../regionserver/TestReplicationValueCompressedWAL.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java index e76603ea8bb4..471ee1c44eab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java @@ -46,7 +46,7 @@ public class TestReplicationCompressedWAL extends TestReplicationBase { HBaseClassTestRule.forClass(TestReplicationCompressedWAL.class); static final Logger LOG = LoggerFactory.getLogger(TestReplicationCompressedWAL.class); - static final int NUM_BATCHES = 100; + static final int NUM_BATCHES = 20; static final int NUM_ROWS_PER_BATCH = 100; @BeforeClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationValueCompressedWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationValueCompressedWAL.java index cba645533dfd..00bf7dc26f11 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationValueCompressedWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationValueCompressedWAL.java @@ -38,7 +38,6 @@ public class TestReplicationValueCompressedWAL extends TestReplicationBase { HBaseClassTestRule.forClass(TestReplicationValueCompressedWAL.class); static final Logger LOG = LoggerFactory.getLogger(TestReplicationValueCompressedWAL.class); - static final int NUM_ROWS = 10; @BeforeClass public static void setUpBeforeClass() throws Exception { From 5919d2109c619bdd0cd241933fe0bd4c9fa6bc88 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Sun, 13 Jun 2021 13:22:20 -0700 Subject: [PATCH 4/6] Just ignore the lower stream's available() in BoundedDelegatingInputStream#available() --- .../hbase/io/BoundedDelegatingInputStream.java | 3 +-- .../hbase/regionserver/wal/WALCellCodec.java | 17 ++++------------- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java index c7002114099b..cecd047e0359 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java @@ -104,8 +104,7 @@ public int available() throws IOException { if (pos >= limit) { return 0; } - int available = in.available(); - return (int) Math.min(available, limit - pos); + return (int) (limit - pos); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index d5f0cbf0f01b..31eccc7a18af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -259,14 +258,11 @@ public void write(Cell cell) throws IOException { // Write tags using Dictionary compression PrivateCellUtil.compressTags(out, cell, compression.tagCompressionContext); } else { - // Tag compression is disabled within the WAL compression. Just write the tags bytes - // as it is. + // Tag compression is disabled within the WAL compression. Just write the tags bytes as + // it is. PrivateCellUtil.writeTags(out, cell, tagsLength); } } - // Flush the output stream now to minimize the chance an reader actively tailing the - // WAL will encounter a short read. - out.flush(); } private void writeCompressedValue(OutputStream out, Cell cell) throws IOException { @@ -385,13 +381,8 @@ private static void checkLength(int len, int max) throws IOException { private void readCompressedValue(InputStream in, byte[] outArray, int outOffset, int expectedLength) throws IOException { int compressedLen = StreamUtils.readRawVarint32(in); - // A partial read of the compressed bytes, depending on which compression codec is used, - // can cause messy IO errors. This can happen when the reader is actively tailing a file - // being written, for replication. - byte[] buffer = new byte[compressedLen]; - IOUtils.readFully(in, buffer, 0, compressedLen); - int read = compression.getValueCompressor().decompress(new ByteArrayInputStream(buffer), - compressedLen, outArray, outOffset, expectedLength); + int read = compression.getValueCompressor().decompress(in, compressedLen, outArray, + outOffset, expectedLength); if (read != expectedLength) { throw new IOException("ValueCompressor state error: short read"); } From ec6eecd719537f4c1162ab8583e4224179ee53e9 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 14 Jun 2021 17:02:48 -0700 Subject: [PATCH 5/6] Update javadoc and add comment --- .../hadoop/hbase/io/BoundedDelegatingInputStream.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java index cecd047e0359..d7db6a348354 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java @@ -95,8 +95,7 @@ public long skip(final long len) throws IOException { } /** - * Call the delegate's {@code available()} method. - * @return the delegate's available bytes if the current position is less than the + * @return the remaining bytes within the bound if the current position is less than the * limit, or 0 otherwise. */ @Override @@ -104,6 +103,12 @@ public int available() throws IOException { if (pos >= limit) { return 0; } + // Do not call the delegate's available() method. Data in a bounded input stream is assumed + // available up to the limit and that is the contract we have with our callers. Regardless + // of what we do here, read() and skip() will behave as expected when EOF is encountered if + // the underlying stream is closed early or otherwise could not provide enough bytes. + // Note: This class is used to supply buffers to compression codecs during WAL tailing and + // successful decompression depends on this behavior. return (int) (limit - pos); } From 26189067133081eedda6383a477980c95bd5d2fc Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 14 Jun 2021 17:03:30 -0700 Subject: [PATCH 6/6] Fix whitespace --- .../replication/regionserver/TestReplicationCompressedWAL.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java index 471ee1c44eab..62fc4a3a90e4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java @@ -69,7 +69,7 @@ protected static void runMultiplePutTest() throws IOException, InterruptedExcept for (int i = 0; i < NUM_BATCHES; i++) { putBatch(i); getBatch(i); - } + } } protected static void getBatch(int batch) throws IOException, InterruptedException {