From 4cf793d0a59af432c307169cbc863ca100fd1730 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Tue, 7 Nov 2023 14:59:58 +0800 Subject: [PATCH 1/9] Add slow sync log rolling test in TestAsyncLogRolling. --- .../hbase/regionserver/wal/AbstractFSWAL.java | 8 ++ .../hadoop/hbase/regionserver/wal/FSHLog.java | 8 -- .../regionserver/wal/TestAsyncLogRolling.java | 128 ++++++++++++++++++ 3 files changed, 136 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index acf3231d4e90..edf3fed89893 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -2245,6 +2245,14 @@ private static void split(final Configuration conf, final Path p) throws IOExcep WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); } + W getWriter() { + return this.writer; + } + + void setWriter(W writer) { + this.writer = writer; + } + private static void usage() { System.err.println("Usage: AbstractFSWAL "); System.err.println("Arguments:"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index d0d5ce5f2e17..131f284557af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -603,14 +603,6 @@ DatanodeInfo[] getPipeline() { return new DatanodeInfo[0]; } - Writer getWriter() { - return this.writer; - } - - void setWriter(Writer writer) { - this.writer = writer; - } - @Override protected Writer createCombinedWriter(Writer localWriter, Writer remoteWriter) { // put remote writer first as usually it will cost more time to finish, so we write to it first diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index 9dc27a693a7f..5211df68ec91 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -18,17 +18,29 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.junit.BeforeClass; @@ -49,6 +61,122 @@ public static void setUpBeforeClass() throws Exception { conf.setInt(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 100); conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); AbstractTestLogRolling.setUpBeforeClass(); + + // For slow sync threshold test: roll once after a sync above this threshold + TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); + } + + @Test + public void testSlowSyncLogRolling() throws Exception { + // Create the test table + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); + int row = 1; + try { + // Get a reference to the AsyncFSWAL + server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); + RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); + final AsyncFSWAL log = (AsyncFSWAL) server.getWAL(region); + + // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested + + final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); + log.registerWALActionsListener(new WALActionsListener() { + @Override + public void logRollRequested(WALActionsListener.RollRequestReason reason) { + switch (reason) { + case SLOW_SYNC: + slowSyncHookCalled.lazySet(true); + break; + default: + break; + } + } + }); + + // Write some data + + for (int i = 0; i < 10; i++) { + writeData(table, row++); + } + + assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + + // Set up for test + slowSyncHookCalled.set(false); + + // Wrap the current writer with the anonymous class below that adds 5000 ms of + // latency to any sync on the hlog. + // This will trip the other threshold. + final WALProvider.AsyncWriter oldWriter2 = log.getWriter(); + final WALProvider.AsyncWriter newWriter2 = new WALProvider.AsyncWriter() { + @Override + public void close() throws IOException { + oldWriter2.close(); + } + + @Override + public CompletableFuture sync(boolean forceSync) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return oldWriter2.sync(forceSync); + } + + @Override + public void append(WAL.Entry entry) { + oldWriter2.append(entry); + } + + @Override + public long getLength() { + return oldWriter2.getLength(); + } + + @Override + public long getSyncedLength() { + return oldWriter2.getSyncedLength(); + } + }; + log.setWriter(newWriter2); + + // Write some data. Should only take one sync. + + writeData(table, row++); + + // Wait for our wait injecting writer to get rolled out, as needed. + + TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return log.getWriter() != newWriter2; + } + + @Override + public String explainFailure() throws Exception { + return "Waited too long for our test writer to get rolled out"; + } + }); + + assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + + // Set up for test + slowSyncHookCalled.set(false); + + // Write some data + for (int i = 0; i < 10; i++) { + writeData(table, row++); + } + + assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + + } finally { + table.close(); + } } @Test From 67dd653fec8a08885bd9682e92cecedcb8806b8e Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Sat, 11 Nov 2023 21:36:10 +0800 Subject: [PATCH 2/9] Add slow sync log rolling test in TestAsyncLogRolling. --- .../hbase/regionserver/wal/AbstractFSWAL.java | 1 + .../wal/AbstractTestLogRolling.java | 133 ++++++++++- .../regionserver/wal/TestAsyncLogRolling.java | 145 +++--------- .../regionserver/wal/TestLogRolling.java | 208 +++--------------- 4 files changed, 190 insertions(+), 297 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index edf3fed89893..52151093bf46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -2249,6 +2249,7 @@ W getWriter() { return this.writer; } + // Only for test. void setWriter(W writer) { this.writer = writer; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 940dbebf614b..b72c9ba79c15 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -20,9 +20,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseTestingUtil; @@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.SingleProcessHBaseCluster; import org.apache.hadoop.hbase.StartTestingClusterOption; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Get; @@ -48,6 +51,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.Assert; @@ -62,7 +66,7 @@ /** * Test log deletion as logs are rolled. */ -public abstract class AbstractTestLogRolling { +public abstract class AbstractTestLogRolling { private static final Logger LOG = LoggerFactory.getLogger(AbstractTestLogRolling.class); protected HRegionServer server; protected String tableName; @@ -118,6 +122,12 @@ public static void setUpBeforeClass() throws Exception { // disable low replication check for log roller to get a more stable result // TestWALOpenAfterDNRollingStart will test this option. conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L * 60 * 60 * 1000); + + // For slow sync threshold test: roll after 5 slow syncs in 10 seconds + conf.setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); + conf.setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); + // For slow sync threshold test: roll once after a sync above this threshold + conf.setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); } @Before @@ -158,6 +168,127 @@ private void startAndWriteData() throws IOException, InterruptedException { } } + @Test + public void testSlowSyncLogRolling() throws Exception { + // Create the test table + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(desc); + Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); + int row = 1; + try { + // Get a reference to the FSHLog + server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); + RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); + final AbstractFSWAL log = (AbstractFSWAL) server.getWAL(region); + + // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested + + final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); + log.registerWALActionsListener(new WALActionsListener() { + @Override + public void logRollRequested(WALActionsListener.RollRequestReason reason) { + switch (reason) { + case SLOW_SYNC: + slowSyncHookCalled.lazySet(true); + break; + default: + break; + } + } + }); + + // Write some data + + for (int i = 0; i < 10; i++) { + writeData(table, row++); + } + + assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + + // Only test for FSHLog. + if ("filesystem".equals(TEST_UTIL.getConfiguration().get(WALFactory.WAL_PROVIDER))) { + // Set up for test + slowSyncHookCalled.set(false); + + // Wrap the current writer with the anonymous class below that adds 200 ms of + // latency to any sync on the hlog. This should be more than sufficient to trigger + // slow sync warnings. + final W oldWriter1 = log.getWriter(); + final W newWriter1 = createNewWriter(oldWriter1, 200); + log.setWriter(newWriter1); + + // Write some data. + // We need to write at least 5 times, but double it. We should only request + // a SLOW_SYNC roll once in the current interval. + for (int i = 0; i < 10; i++) { + writeData(table, row++); + } + + // Wait for our wait injecting writer to get rolled out, as needed. + + TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return log.getWriter() != newWriter1; + } + + @Override + public String explainFailure() throws Exception { + return "Waited too long for our test writer to get rolled out"; + } + }); + + assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + } + + // Set up for test + slowSyncHookCalled.set(false); + + // Wrap the current writer with the anonymous class below that adds 5000 ms of + // latency to any sync on the hlog. + // This will trip the other threshold. + final W oldWriter2 = log.getWriter(); + final W newWriter2 = createNewWriter(oldWriter2, 5000); + log.setWriter(newWriter2); + + // Write some data. Should only take one sync. + + writeData(table, row++); + + // Wait for our wait injecting writer to get rolled out, as needed. + + TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { + @Override + public boolean evaluate() throws Exception { + return log.getWriter() != newWriter2; + } + + @Override + public String explainFailure() throws Exception { + return "Waited too long for our test writer to get rolled out"; + } + }); + + assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + + // Set up for test + slowSyncHookCalled.set(false); + + // Write some data + for (int i = 0; i < 10; i++) { + writeData(table, row++); + } + + assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + + } finally { + table.close(); + } + } + + protected abstract W createNewWriter(W oldWriter, int sleepTimeMillis); + /** * Tests that log rolling doesn't hang when no data is written. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index 5211df68ec91..fd493bae2478 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -18,22 +18,13 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; @@ -49,7 +40,7 @@ import org.junit.experimental.categories.Category; @Category({ VerySlowRegionServerTests.class, LargeTests.class }) -public class TestAsyncLogRolling extends AbstractTestLogRolling { +public class TestAsyncLogRolling extends AbstractTestLogRolling { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -61,122 +52,42 @@ public static void setUpBeforeClass() throws Exception { conf.setInt(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 100); conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); AbstractTestLogRolling.setUpBeforeClass(); - - // For slow sync threshold test: roll once after a sync above this threshold - TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); } - @Test - public void testSlowSyncLogRolling() throws Exception { - // Create the test table - TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); - admin.createTable(desc); - Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); - int row = 1; - try { - // Get a reference to the AsyncFSWAL - server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); - RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); - final AsyncFSWAL log = (AsyncFSWAL) server.getWAL(region); - - // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested - - final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); - log.registerWALActionsListener(new WALActionsListener() { - @Override - public void logRollRequested(WALActionsListener.RollRequestReason reason) { - switch (reason) { - case SLOW_SYNC: - slowSyncHookCalled.lazySet(true); - break; - default: - break; - } - } - }); - - // Write some data - - for (int i = 0; i < 10; i++) { - writeData(table, row++); + @Override + protected WALProvider.AsyncWriter createNewWriter(WALProvider.AsyncWriter oldWriter, + int sleepTimeMillis) { + return new WALProvider.AsyncWriter() { + @Override + public void close() throws IOException { + oldWriter.close(); } - assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set up for test - slowSyncHookCalled.set(false); - - // Wrap the current writer with the anonymous class below that adds 5000 ms of - // latency to any sync on the hlog. - // This will trip the other threshold. - final WALProvider.AsyncWriter oldWriter2 = log.getWriter(); - final WALProvider.AsyncWriter newWriter2 = new WALProvider.AsyncWriter() { - @Override - public void close() throws IOException { - oldWriter2.close(); - } - - @Override - public CompletableFuture sync(boolean forceSync) { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return oldWriter2.sync(forceSync); - } - - @Override - public void append(WAL.Entry entry) { - oldWriter2.append(entry); - } - - @Override - public long getLength() { - return oldWriter2.getLength(); + @Override + public CompletableFuture sync(boolean forceSync) { + try { + Thread.sleep(sleepTimeMillis); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + return oldWriter.sync(forceSync); + } - @Override - public long getSyncedLength() { - return oldWriter2.getSyncedLength(); - } - }; - log.setWriter(newWriter2); - - // Write some data. Should only take one sync. - - writeData(table, row++); - - // Wait for our wait injecting writer to get rolled out, as needed. - - TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { - @Override - public boolean evaluate() throws Exception { - return log.getWriter() != newWriter2; - } - - @Override - public String explainFailure() throws Exception { - return "Waited too long for our test writer to get rolled out"; - } - }); - - assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set up for test - slowSyncHookCalled.set(false); - - // Write some data - for (int i = 0; i < 10; i++) { - writeData(table, row++); + @Override + public void append(WAL.Entry entry) { + oldWriter.append(entry); } - assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + @Override + public long getLength() { + return oldWriter.getLength(); + } - } finally { - table.close(); - } + @Override + public long getSyncedLength() { + return oldWriter.getSyncedLength(); + } + }; } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index f07a02cb25d1..96246bc48f13 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -59,7 +58,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALProvider.Writer; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALStreamReader; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -71,7 +70,7 @@ import org.slf4j.LoggerFactory; @Category({ VerySlowRegionServerTests.class, LargeTests.class }) -public class TestLogRolling extends AbstractTestLogRolling { +public class TestLogRolling extends AbstractTestLogRolling { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -98,192 +97,43 @@ public static void setUpBeforeClass() throws Exception { conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); conf.set(WALFactory.WAL_PROVIDER, "filesystem"); AbstractTestLogRolling.setUpBeforeClass(); - - // For slow sync threshold test: roll after 5 slow syncs in 10 seconds - TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); - TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); - // For slow sync threshold test: roll once after a sync above this threshold - TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); } - @Test - public void testSlowSyncLogRolling() throws Exception { - // Create the test table - TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); - admin.createTable(desc); - Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); - int row = 1; - try { - // Get a reference to the FSHLog - server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); - RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); - final FSHLog log = (FSHLog) server.getWAL(region); - - // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested - - final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); - log.registerWALActionsListener(new WALActionsListener() { - @Override - public void logRollRequested(WALActionsListener.RollRequestReason reason) { - switch (reason) { - case SLOW_SYNC: - slowSyncHookCalled.lazySet(true); - break; - default: - break; - } - } - }); - - // Write some data - - for (int i = 0; i < 10; i++) { - writeData(table, row++); + @Override + protected WALProvider.Writer createNewWriter(WALProvider.Writer oldWriter, int sleepTimeMillis) { + return new WALProvider.Writer() { + @Override + public void close() throws IOException { + oldWriter.close(); } - assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set up for test - slowSyncHookCalled.set(false); - - // Wrap the current writer with the anonymous class below that adds 200 ms of - // latency to any sync on the hlog. This should be more than sufficient to trigger - // slow sync warnings. - final Writer oldWriter1 = log.getWriter(); - final Writer newWriter1 = new Writer() { - @Override - public void close() throws IOException { - oldWriter1.close(); - } - - @Override - public void sync(boolean forceSync) throws IOException { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - InterruptedIOException ex = new InterruptedIOException(); - ex.initCause(e); - throw ex; - } - oldWriter1.sync(forceSync); - } - - @Override - public void append(Entry entry) throws IOException { - oldWriter1.append(entry); - } - - @Override - public long getLength() { - return oldWriter1.getLength(); - } - - @Override - public long getSyncedLength() { - return oldWriter1.getSyncedLength(); + @Override + public void sync(boolean forceSync) throws IOException { + try { + Thread.sleep(sleepTimeMillis); + } catch (InterruptedException e) { + InterruptedIOException ex = new InterruptedIOException(); + ex.initCause(e); + throw ex; } - }; - log.setWriter(newWriter1); - - // Write some data. - // We need to write at least 5 times, but double it. We should only request - // a SLOW_SYNC roll once in the current interval. - for (int i = 0; i < 10; i++) { - writeData(table, row++); + oldWriter.sync(forceSync); } - // Wait for our wait injecting writer to get rolled out, as needed. - - TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { - @Override - public boolean evaluate() throws Exception { - return log.getWriter() != newWriter1; - } - - @Override - public String explainFailure() throws Exception { - return "Waited too long for our test writer to get rolled out"; - } - }); - - assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set up for test - slowSyncHookCalled.set(false); - - // Wrap the current writer with the anonymous class below that adds 5000 ms of - // latency to any sync on the hlog. - // This will trip the other threshold. - final Writer oldWriter2 = (Writer) log.getWriter(); - final Writer newWriter2 = new Writer() { - @Override - public void close() throws IOException { - oldWriter2.close(); - } - - @Override - public void sync(boolean forceSync) throws IOException { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - InterruptedIOException ex = new InterruptedIOException(); - ex.initCause(e); - throw ex; - } - oldWriter2.sync(forceSync); - } - - @Override - public void append(Entry entry) throws IOException { - oldWriter2.append(entry); - } - - @Override - public long getLength() { - return oldWriter2.getLength(); - } - - @Override - public long getSyncedLength() { - return oldWriter2.getSyncedLength(); - } - }; - log.setWriter(newWriter2); - - // Write some data. Should only take one sync. - - writeData(table, row++); - - // Wait for our wait injecting writer to get rolled out, as needed. - - TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { - @Override - public boolean evaluate() throws Exception { - return log.getWriter() != newWriter2; - } - - @Override - public String explainFailure() throws Exception { - return "Waited too long for our test writer to get rolled out"; - } - }); - - assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set up for test - slowSyncHookCalled.set(false); - - // Write some data - for (int i = 0; i < 10; i++) { - writeData(table, row++); + @Override + public void append(Entry entry) throws IOException { + oldWriter.append(entry); } - assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + @Override + public long getLength() { + return oldWriter.getLength(); + } - } finally { - table.close(); - } + @Override + public long getSyncedLength() { + return oldWriter.getSyncedLength(); + } + }; } void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) From bfe7f7fd26b629efef318929ced83d41a6e481e9 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Sun, 12 Nov 2023 22:42:37 +0800 Subject: [PATCH 3/9] Add slow sync log rolling test in TestAsyncLogRolling. --- .../hbase/regionserver/wal/AbstractFSWAL.java | 5 -- .../wal/AbstractTestLogRolling.java | 60 +++++++++++-------- .../regionserver/wal/TestAsyncLogRolling.java | 56 ++++++++--------- .../regionserver/wal/TestLogRolling.java | 56 +++++++---------- 4 files changed, 81 insertions(+), 96 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 52151093bf46..1a5b5384b01f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -2249,11 +2249,6 @@ W getWriter() { return this.writer; } - // Only for test. - void setWriter(W writer) { - this.writer = writer; - } - private static void usage() { System.err.println("Usage: AbstractFSWAL "); System.err.println("Arguments:"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index b72c9ba79c15..7a42f34838f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -66,7 +66,7 @@ /** * Test log deletion as logs are rolled. */ -public abstract class AbstractTestLogRolling { +public abstract class AbstractTestLogRolling { private static final Logger LOG = LoggerFactory.getLogger(AbstractTestLogRolling.class); protected HRegionServer server; protected String tableName; @@ -78,6 +78,7 @@ public abstract class AbstractTestLogRolling { protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); @Rule public final TestName name = new TestName(); + private static int syncLatencyMillis; public AbstractTestLogRolling() { this.server = null; @@ -168,6 +169,14 @@ private void startAndWriteData() throws IOException, InterruptedException { } } + public static void setSyncLatencyMillis(int latency) { + syncLatencyMillis = latency; + } + + public static int getSyncLatencyMillis() { + return syncLatencyMillis; + } + @Test public void testSlowSyncLogRolling() throws Exception { // Create the test table @@ -177,14 +186,13 @@ public void testSlowSyncLogRolling() throws Exception { Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); int row = 1; try { - // Get a reference to the FSHLog server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); - final AbstractFSWAL log = (AbstractFSWAL) server.getWAL(region); - - // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested + // Get a reference to the wal. + final AbstractFSWAL log = (AbstractFSWAL) server.getWAL(region); final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); + // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested log.registerWALActionsListener(new WALActionsListener() { @Override public void logRollRequested(WALActionsListener.RollRequestReason reason) { @@ -199,7 +207,6 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) { }); // Write some data - for (int i = 0; i < 10; i++) { writeData(table, row++); } @@ -208,15 +215,17 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) { // Only test for FSHLog. if ("filesystem".equals(TEST_UTIL.getConfiguration().get(WALFactory.WAL_PROVIDER))) { + + // Adds 200 ms of latency to any sync on the hlog. This should be more than sufficient to + // trigger slow sync warnings. + setSyncLatencyMillis(200); + setSlowLogWriter(log.conf); + log.rollWriter(true); + // Set up for test slowSyncHookCalled.set(false); - // Wrap the current writer with the anonymous class below that adds 200 ms of - // latency to any sync on the hlog. This should be more than sufficient to trigger - // slow sync warnings. - final W oldWriter1 = log.getWriter(); - final W newWriter1 = createNewWriter(oldWriter1, 200); - log.setWriter(newWriter1); + final WALProvider.WriterBase oldWriter1 = log.getWriter(); // Write some data. // We need to write at least 5 times, but double it. We should only request @@ -226,11 +235,10 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) { } // Wait for our wait injecting writer to get rolled out, as needed. - TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { - return log.getWriter() != newWriter1; + return log.getWriter() != oldWriter1; } @Override @@ -242,26 +250,24 @@ public String explainFailure() throws Exception { assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); } + // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold. + setSyncLatencyMillis(5000); + setSlowLogWriter(log.conf); + log.rollWriter(true); + // Set up for test slowSyncHookCalled.set(false); - // Wrap the current writer with the anonymous class below that adds 5000 ms of - // latency to any sync on the hlog. - // This will trip the other threshold. - final W oldWriter2 = log.getWriter(); - final W newWriter2 = createNewWriter(oldWriter2, 5000); - log.setWriter(newWriter2); + final WALProvider.WriterBase oldWriter2 = log.getWriter(); // Write some data. Should only take one sync. - writeData(table, row++); // Wait for our wait injecting writer to get rolled out, as needed. - TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { - return log.getWriter() != newWriter2; + return log.getWriter() != oldWriter2; } @Override @@ -272,6 +278,10 @@ public String explainFailure() throws Exception { assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + // Set default log writer, no additional latency to any sync on the hlog. + setDefaultLogWriter(log.conf); + log.rollWriter(true); + // Set up for test slowSyncHookCalled.set(false); @@ -287,7 +297,9 @@ public String explainFailure() throws Exception { } } - protected abstract W createNewWriter(W oldWriter, int sleepTimeMillis); + protected abstract void setSlowLogWriter(Configuration conf); + + protected abstract void setDefaultLogWriter(Configuration conf); /** * Tests that log rolling doesn't hang when no data is written. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index fd493bae2478..552dac10f280 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -29,9 +29,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.junit.BeforeClass; @@ -39,8 +37,11 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; + @Category({ VerySlowRegionServerTests.class, LargeTests.class }) -public class TestAsyncLogRolling extends AbstractTestLogRolling { +public class TestAsyncLogRolling extends AbstractTestLogRolling { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -54,40 +55,31 @@ public static void setUpBeforeClass() throws Exception { AbstractTestLogRolling.setUpBeforeClass(); } - @Override - protected WALProvider.AsyncWriter createNewWriter(WALProvider.AsyncWriter oldWriter, - int sleepTimeMillis) { - return new WALProvider.AsyncWriter() { - @Override - public void close() throws IOException { - oldWriter.close(); - } + public static class SlowSyncLogWriter extends AsyncProtobufLogWriter { - @Override - public CompletableFuture sync(boolean forceSync) { - try { - Thread.sleep(sleepTimeMillis); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return oldWriter.sync(forceSync); - } + public SlowSyncLogWriter(EventLoopGroup eventLoopGroup, Class channelClass) { + super(eventLoopGroup, channelClass); + } - @Override - public void append(WAL.Entry entry) { - oldWriter.append(entry); + @Override + public CompletableFuture sync(boolean forceSync) { + try { + Thread.sleep(getSyncLatencyMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + return super.sync(forceSync); + } + } - @Override - public long getLength() { - return oldWriter.getLength(); - } + @Override + protected void setSlowLogWriter(Configuration conf) { + conf.set(AsyncFSWALProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName()); + } - @Override - public long getSyncedLength() { - return oldWriter.getSyncedLength(); - } - }; + @Override + protected void setDefaultLogWriter(Configuration conf) { + conf.set(AsyncFSWALProvider.WRITER_IMPL, AsyncProtobufLogWriter.class.getName()); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 96246bc48f13..1a7f53c1ec45 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -55,10 +55,9 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALStreamReader; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -70,7 +69,7 @@ import org.slf4j.LoggerFactory; @Category({ VerySlowRegionServerTests.class, LargeTests.class }) -public class TestLogRolling extends AbstractTestLogRolling { +public class TestLogRolling extends AbstractTestLogRolling { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -99,41 +98,28 @@ public static void setUpBeforeClass() throws Exception { AbstractTestLogRolling.setUpBeforeClass(); } - @Override - protected WALProvider.Writer createNewWriter(WALProvider.Writer oldWriter, int sleepTimeMillis) { - return new WALProvider.Writer() { - @Override - public void close() throws IOException { - oldWriter.close(); - } - - @Override - public void sync(boolean forceSync) throws IOException { - try { - Thread.sleep(sleepTimeMillis); - } catch (InterruptedException e) { - InterruptedIOException ex = new InterruptedIOException(); - ex.initCause(e); - throw ex; - } - oldWriter.sync(forceSync); - } - - @Override - public void append(Entry entry) throws IOException { - oldWriter.append(entry); + public static class SlowSyncLogWriter extends ProtobufLogWriter { + @Override + public void sync(boolean forceSync) throws IOException { + try { + Thread.sleep(getSyncLatencyMillis()); + } catch (InterruptedException e) { + InterruptedIOException ex = new InterruptedIOException(); + ex.initCause(e); + throw ex; } + super.sync(forceSync); + } + } - @Override - public long getLength() { - return oldWriter.getLength(); - } + @Override + protected void setSlowLogWriter(Configuration conf) { + conf.set(FSHLogProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName()); + } - @Override - public long getSyncedLength() { - return oldWriter.getSyncedLength(); - } - }; + @Override + protected void setDefaultLogWriter(Configuration conf) { + conf.set(FSHLogProvider.WRITER_IMPL, ProtobufLogWriter.class.getName()); } void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) From 4fd0dc40c7873daf023678e4ff932ee3f6ea2cbe Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Mon, 13 Nov 2023 10:17:05 +0800 Subject: [PATCH 4/9] Add slow sync log rolling test in TestAsyncLogRolling. --- .../regionserver/wal/AbstractTestLogRolling.java | 14 +++----------- .../regionserver/wal/TestAsyncLogRolling.java | 2 +- .../hbase/regionserver/wal/TestLogRolling.java | 2 +- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 7a42f34838f5..2c23457c44a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -78,7 +78,7 @@ public abstract class AbstractTestLogRolling { protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); @Rule public final TestName name = new TestName(); - private static int syncLatencyMillis; + protected static int syncLatencyMillis; public AbstractTestLogRolling() { this.server = null; @@ -169,23 +169,18 @@ private void startAndWriteData() throws IOException, InterruptedException { } } - public static void setSyncLatencyMillis(int latency) { + private static void setSyncLatencyMillis(int latency) { syncLatencyMillis = latency; } - public static int getSyncLatencyMillis() { - return syncLatencyMillis; - } - @Test public void testSlowSyncLogRolling() throws Exception { // Create the test table TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); admin.createTable(desc); - Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); int row = 1; - try { + try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) { server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); // Get a reference to the wal. @@ -291,9 +286,6 @@ public String explainFailure() throws Exception { } assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - } finally { - table.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index 552dac10f280..66ea6a94fcd8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -64,7 +64,7 @@ public SlowSyncLogWriter(EventLoopGroup eventLoopGroup, Class @Override public CompletableFuture sync(boolean forceSync) { try { - Thread.sleep(getSyncLatencyMillis()); + Thread.sleep(syncLatencyMillis); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 1a7f53c1ec45..45baa27ba11a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -102,7 +102,7 @@ public static class SlowSyncLogWriter extends ProtobufLogWriter { @Override public void sync(boolean forceSync) throws IOException { try { - Thread.sleep(getSyncLatencyMillis()); + Thread.sleep(syncLatencyMillis); } catch (InterruptedException e) { InterruptedIOException ex = new InterruptedIOException(); ex.initCause(e); From b2d8cccab8e7b9261f791f0d5961a05ff077ffd4 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Sun, 19 Nov 2023 21:40:38 +0800 Subject: [PATCH 5/9] Add slow sync log rolling test in TestAsyncLogRolling. --- .../wal/AbstractTestLogRolling.java | 141 ++++++------------ .../regionserver/wal/TestAsyncLogRolling.java | 28 ++++ .../regionserver/wal/TestLogRolling.java | 30 ++++ 3 files changed, 100 insertions(+), 99 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 2c23457c44a8..ec1117b30dc1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -79,6 +79,8 @@ public abstract class AbstractTestLogRolling { @Rule public final TestName name = new TestName(); protected static int syncLatencyMillis; + private static int rowNum = 1; + private static final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); public AbstractTestLogRolling() { this.server = null; @@ -173,96 +175,52 @@ private static void setSyncLatencyMillis(int latency) { syncLatencyMillis = latency; } - @Test - public void testSlowSyncLogRolling() throws Exception { - // Create the test table - TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); - admin.createTable(desc); - int row = 1; - try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) { - server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); - RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); - // Get a reference to the wal. - final AbstractFSWAL log = (AbstractFSWAL) server.getWAL(region); - - final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); - // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested - log.registerWALActionsListener(new WALActionsListener() { - @Override - public void logRollRequested(WALActionsListener.RollRequestReason reason) { - switch (reason) { - case SLOW_SYNC: - slowSyncHookCalled.lazySet(true); - break; - default: - break; - } - } - }); - - // Write some data - for (int i = 0; i < 10; i++) { - writeData(table, row++); - } - - assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Only test for FSHLog. - if ("filesystem".equals(TEST_UTIL.getConfiguration().get(WALFactory.WAL_PROVIDER))) { - - // Adds 200 ms of latency to any sync on the hlog. This should be more than sufficient to - // trigger slow sync warnings. - setSyncLatencyMillis(200); - setSlowLogWriter(log.conf); - log.rollWriter(true); - - // Set up for test - slowSyncHookCalled.set(false); - - final WALProvider.WriterBase oldWriter1 = log.getWriter(); - - // Write some data. - // We need to write at least 5 times, but double it. We should only request - // a SLOW_SYNC roll once in the current interval. - for (int i = 0; i < 10; i++) { - writeData(table, row++); + protected final AbstractFSWAL getWALAndRegisterSlowSyncHook(RegionInfo region) + throws IOException { + // Get a reference to the wal. + final AbstractFSWAL log = (AbstractFSWAL) server.getWAL(region); + + // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested + log.registerWALActionsListener(new WALActionsListener() { + @Override + public void logRollRequested(RollRequestReason reason) { + switch (reason) { + case SLOW_SYNC: + slowSyncHookCalled.lazySet(true); + break; + default: + break; } - - // Wait for our wait injecting writer to get rolled out, as needed. - TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { - @Override - public boolean evaluate() throws Exception { - return log.getWriter() != oldWriter1; - } - - @Override - public String explainFailure() throws Exception { - return "Waited too long for our test writer to get rolled out"; - } - }); - - assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); } + }); + return log; + } - // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold. - setSyncLatencyMillis(5000); + protected final void checkSlowSync(AbstractFSWAL log, Table table, int slowSyncLatency, + int writeCount, boolean slowSync) throws Exception { + if (slowSyncLatency > 0) { + setSyncLatencyMillis(slowSyncLatency); setSlowLogWriter(log.conf); - log.rollWriter(true); + } else { + setDefaultLogWriter(log.conf); + } - // Set up for test - slowSyncHookCalled.set(false); + // Set up for test + log.rollWriter(true); + slowSyncHookCalled.set(false); - final WALProvider.WriterBase oldWriter2 = log.getWriter(); + final WALProvider.WriterBase oldWriter = log.getWriter(); - // Write some data. Should only take one sync. - writeData(table, row++); + // Write some data + for (int i = 0; i < writeCount; i++) { + writeData(table, rowNum++); + } - // Wait for our wait injecting writer to get rolled out, as needed. + if (slowSyncLatency > 0) { TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { - return log.getWriter() != oldWriter2; + return log.getWriter() != oldWriter; } @Override @@ -270,21 +228,11 @@ public String explainFailure() throws Exception { return "Waited too long for our test writer to get rolled out"; } }); + } + if (slowSync) { assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set default log writer, no additional latency to any sync on the hlog. - setDefaultLogWriter(log.conf); - log.rollWriter(true); - - // Set up for test - slowSyncHookCalled.set(false); - - // Write some data - for (int i = 0; i < 10; i++) { - writeData(table, row++); - } - + } else { assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); } } @@ -374,12 +322,10 @@ void validateData(Table table, int rownum) throws IOException { */ @Test public void testCompactionRecordDoesntBlockRolling() throws Exception { - Table table = null; // When the hbase:meta table can be opened, the region servers are running - Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); - try { - table = createTestTable(getName()); + try (Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); + Table table = createTestTable(getName())) { server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); HRegion region = server.getRegions(table.getName()).get(0); @@ -421,9 +367,6 @@ public void testCompactionRecordDoesntBlockRolling() throws Exception { log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. assertEquals("Should have 1 WALs at the end", 1, AbstractFSWALProvider.getNumRolledLogFiles(log)); - } finally { - if (t != null) t.close(); - if (table != null) table.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index 66ea6a94fcd8..a461c28ba5a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -23,8 +23,13 @@ import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; @@ -82,6 +87,29 @@ protected void setDefaultLogWriter(Configuration conf) { conf.set(AsyncFSWALProvider.WRITER_IMPL, AsyncProtobufLogWriter.class.getName()); } + @Test + public void testSlowSyncLogRolling() throws Exception { + // Create the test table + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(desc); + try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) { + server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); + RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); + final AbstractFSWAL log = getWALAndRegisterSlowSyncHook(region); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + + // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold. + // Write some data. Should only take one sync. + checkSlowSync(log, table, 5000, 1, true); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + } + } + @Test public void testLogRollOnDatanodeDeath() throws IOException, InterruptedException { dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 3, true, null, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 45baa27ba11a..9caa47e8614b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -149,6 +149,36 @@ void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, } } + @Test + public void testSlowSyncLogRolling() throws Exception { + // Create the test table + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(desc); + try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) { + server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); + RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); + final AbstractFSWAL log = getWALAndRegisterSlowSyncHook(region); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + + // Adds 200 ms of latency to any sync on the hlog. This should be more than sufficient to + // trigger slow sync warnings. + // Write some data. + // We need to write at least 5 times, but double it. We should only request + // a SLOW_SYNC roll once in the current interval. + checkSlowSync(log, table, 200, 10, true); + + // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold. + // Write some data. Should only take one sync. + checkSlowSync(log, table, 5000, 1, true); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + } + } + /** * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & * syncFs() support (HDFS-200) From a38a5ebbf1c7e33b73bd62e4fabcd5db7d9b43b8 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Sun, 19 Nov 2023 22:10:56 +0800 Subject: [PATCH 6/9] Add slow sync log rolling test in TestAsyncLogRolling. --- .../hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index ec1117b30dc1..5b0979c8f707 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -216,7 +216,7 @@ protected final void checkSlowSync(AbstractFSWAL log, Table table, int slowSy writeData(table, rowNum++); } - if (slowSyncLatency > 0) { + if (slowSync) { TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate() { @Override public boolean evaluate() throws Exception { @@ -228,9 +228,7 @@ public String explainFailure() throws Exception { return "Waited too long for our test writer to get rolled out"; } }); - } - if (slowSync) { assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); } else { assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); From f8dc611e18e1ffbc0e91ba019520080166d31089 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Sun, 3 Dec 2023 14:38:18 +0800 Subject: [PATCH 7/9] Add slow sync log rolling test in TestAsyncLogRolling. --- .../regionserver/wal/TestAsyncLogRolling.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index a461c28ba5a1..b9ae7907c04e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -68,12 +68,20 @@ public SlowSyncLogWriter(EventLoopGroup eventLoopGroup, Class @Override public CompletableFuture sync(boolean forceSync) { - try { - Thread.sleep(syncLatencyMillis); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return super.sync(forceSync); + CompletableFuture future = new CompletableFuture<>(); + super.sync(forceSync).whenCompleteAsync((lengthAfterFlush, error) -> { + if (error != null) { + future.completeExceptionally(error); + } else { + try { + Thread.sleep(syncLatencyMillis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + future.complete(lengthAfterFlush); + } + }); + return future; } } From ceb51c5588d95b9cc5fca143793511d9db807b2d Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Sun, 3 Dec 2023 23:46:15 +0800 Subject: [PATCH 8/9] Add slow sync log rolling test in TestAsyncLogRolling. --- .../regionserver/wal/AbstractTestLogRolling.java | 7 +++++++ .../regionserver/wal/TestAsyncLogRolling.java | 16 +++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 5b0979c8f707..bd54147f0a63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -63,6 +65,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Test log deletion as logs are rolled. */ @@ -81,6 +85,9 @@ public abstract class AbstractTestLogRolling { protected static int syncLatencyMillis; private static int rowNum = 1; private static final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); + protected static final ScheduledExecutorService EXECUTOR = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Slow-sync-%d") + .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); public AbstractTestLogRolling() { this.server = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index b9ae7907c04e..804e93eb8f56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; @@ -70,16 +71,13 @@ public SlowSyncLogWriter(EventLoopGroup eventLoopGroup, Class public CompletableFuture sync(boolean forceSync) { CompletableFuture future = new CompletableFuture<>(); super.sync(forceSync).whenCompleteAsync((lengthAfterFlush, error) -> { - if (error != null) { - future.completeExceptionally(error); - } else { - try { - Thread.sleep(syncLatencyMillis); - } catch (InterruptedException e) { - throw new RuntimeException(e); + EXECUTOR.schedule(() -> { + if (error != null) { + future.completeExceptionally(error); + } else { + future.complete(lengthAfterFlush); } - future.complete(lengthAfterFlush); - } + }, syncLatencyMillis, TimeUnit.MILLISECONDS); }); return future; } From e867e87f9645bae4a11b94fcc2e2e1e2694d2d5c Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Mon, 11 Dec 2023 17:03:15 +0800 Subject: [PATCH 9/9] Add slow sync log rolling test in TestAsyncLogRolling. --- .../regionserver/wal/AbstractTestLogRolling.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index bd54147f0a63..2a5aec458828 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -85,9 +86,7 @@ public abstract class AbstractTestLogRolling { protected static int syncLatencyMillis; private static int rowNum = 1; private static final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); - protected static final ScheduledExecutorService EXECUTOR = Executors - .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Slow-sync-%d") - .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); + protected static ScheduledExecutorService EXECUTOR; public AbstractTestLogRolling() { this.server = null; @@ -138,6 +137,11 @@ public static void setUpBeforeClass() throws Exception { conf.setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); // For slow sync threshold test: roll once after a sync above this threshold conf.setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); + + // Slow sync executor. + EXECUTOR = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Slow-sync-%d") + .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); } @Before @@ -159,6 +163,11 @@ public void tearDown() throws Exception { TEST_UTIL.shutdownMiniCluster(); } + @AfterClass + public static void tearDownAfterClass() { + EXECUTOR.shutdownNow(); + } + private void startAndWriteData() throws IOException, InterruptedException { this.server = cluster.getRegionServerThreads().get(0).getRegionServer();