From d2ee29ffdfcbdb761f4f038f066fac34ee2ce6c8 Mon Sep 17 00:00:00 2001 From: wen_yi Date: Mon, 6 Jul 2020 11:47:16 +0800 Subject: [PATCH 1/7] HBASE-24665 all wal of RegionGroupingProvider together roll --- .../apache/hadoop/hbase/wal/AbstractWALRoller.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index d2b67170409f..56d6f8534f66 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -65,7 +65,7 @@ public abstract class AbstractWALRoller extends Thread private final long rollPeriod; private final int threadWakeFrequency; // The interval to check low replication on hlog's pipeline - private long checkLowReplicationInterval; + private final long checkLowReplicationInterval; private volatile boolean running = true; @@ -148,10 +148,9 @@ private void abort(String reason, Throwable cause) { @Override public void run() { while (running) { - boolean periodic = false; long now = System.currentTimeMillis(); checkLowReplication(now); - periodic = (now - this.lastRollTime) > this.rollPeriod; + boolean periodic = (now - this.lastRollTime) > this.rollPeriod; if (periodic) { // Time for periodic roll, fall through LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod); @@ -178,14 +177,17 @@ public void run() { for (Iterator> iter = walNeedsRoll.entrySet().iterator(); iter .hasNext();) { Entry entry = iter.next(); + if (!periodic && !entry.getValue()) { + continue; + } WAL wal = entry.getKey(); // reset the flag in front to avoid missing roll request before we return from rollWriter. - walNeedsRoll.put(wal, Boolean.FALSE); + entry.setValue(Boolean.FALSE); Map> regionsToFlush = null; try { // Force the roll if the logroll.period is elapsed or if a roll was requested. // The returned value is an collection of actual region and family names. - regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue()); + regionsToFlush = wal.rollWriter(true); } catch (WALClosedException e) { LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e); iter.remove(); @@ -232,7 +234,7 @@ private boolean isWaiting() { * @return true if all WAL roll finished */ public boolean walRollFinished() { - return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting(); + return walNeedsRoll.values().stream().noneMatch(needRoll -> needRoll) && isWaiting(); } /** From 07afe84ba265901552e2938bdc27a1ba5af8fcff Mon Sep 17 00:00:00 2001 From: wen_yi Date: Fri, 10 Jul 2020 19:01:36 +0800 Subject: [PATCH 2/7] fix period roll --- .../hadoop/hbase/regionserver/LogRoller.java | 7 +- .../hadoop/hbase/wal/AbstractWALRoller.java | 68 +++++++++++++------ .../hbase/regionserver/TestLogRoller.java | 44 +++++++++++- 3 files changed, 97 insertions(+), 22 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 58ac82ee6cd7..e12fd9f93542 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -66,6 +67,10 @@ protected void scheduleFlush(String encodedRegionName, List families) { @VisibleForTesting Map getWalNeedsRoll() { - return this.walNeedsRoll; + Map walNeedsRoll = new HashMap<>(); + for (Map.Entry entry : this.walNeedsRoll.entrySet()) { + walNeedsRoll.put(entry.getKey(), entry.getValue().isRequestRoll()); + } + return walNeedsRoll; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index 56d6f8534f66..409808b36bc1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -20,7 +20,6 @@ import java.io.Closeable; import java.io.IOException; import java.net.ConnectException; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -58,9 +57,8 @@ public abstract class AbstractWALRoller extends Thread protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period"; - protected final ConcurrentMap walNeedsRoll = new ConcurrentHashMap<>(); + protected final ConcurrentMap walNeedsRoll = new ConcurrentHashMap<>(); protected final T abortable; - private volatile long lastRollTime = System.currentTimeMillis(); // Period to roll log. private final long rollPeriod; private final int threadWakeFrequency; @@ -76,13 +74,14 @@ public void addWAL(WAL wal) { } // this is to avoid race between addWAL and requestRollAll. synchronized (this) { - if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) { + if (walNeedsRoll.putIfAbsent(wal, new RollController()) == null) { wal.registerWALActionsListener(new WALActionsListener() { @Override public void logRollRequested(WALActionsListener.RollRequestReason reason) { // TODO logs will contend with each other here, replace with e.g. DelayedQueue synchronized (AbstractWALRoller.this) { - walNeedsRoll.put(wal, Boolean.TRUE); + RollController controller = walNeedsRoll.computeIfAbsent(wal, rc -> new RollController()); + controller.requestRoll(); AbstractWALRoller.this.notifyAll(); } } @@ -93,9 +92,8 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) { public void requestRollAll() { synchronized (this) { - List wals = new ArrayList(walNeedsRoll.keySet()); - for (WAL wal : wals) { - walNeedsRoll.put(wal, Boolean.TRUE); + for (RollController controller : walNeedsRoll.values()) { + controller.requestRoll(); } notifyAll(); } @@ -115,9 +113,9 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) { */ private void checkLowReplication(long now) { try { - for (Entry entry : walNeedsRoll.entrySet()) { + for (Entry entry : walNeedsRoll.entrySet()) { WAL wal = entry.getKey(); - boolean needRollAlready = entry.getValue(); + boolean needRollAlready = entry.getValue().isRequestRoll; if (needRollAlready || !(wal instanceof AbstractFSWAL)) { continue; } @@ -150,13 +148,12 @@ public void run() { while (running) { long now = System.currentTimeMillis(); checkLowReplication(now); - boolean periodic = (now - this.lastRollTime) > this.rollPeriod; - if (periodic) { + if (walNeedsRoll.values().stream().anyMatch(rc -> rc.isPeriodRoll(now))) { // Time for periodic roll, fall through LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod); } else { synchronized (this) { - if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) { + if (walNeedsRoll.values().stream().anyMatch(rc -> rc.isRequestRoll)) { // WAL roll requested, fall through LOG.debug("WAL roll requested"); } else { @@ -173,16 +170,16 @@ public void run() { } } try { - this.lastRollTime = System.currentTimeMillis(); - for (Iterator> iter = walNeedsRoll.entrySet().iterator(); iter - .hasNext();) { - Entry entry = iter.next(); - if (!periodic && !entry.getValue()) { + for (Iterator> iter = walNeedsRoll.entrySet().iterator(); + iter.hasNext();) { + Entry entry = iter.next(); + RollController controller = entry.getValue(); + if (!controller.isRequestRoll && !controller.isPeriodRoll(now)) { continue; } WAL wal = entry.getKey(); // reset the flag in front to avoid missing roll request before we return from rollWriter. - entry.setValue(Boolean.FALSE); + controller.finishRoll(); Map> regionsToFlush = null; try { // Force the roll if the logroll.period is elapsed or if a roll was requested. @@ -234,7 +231,7 @@ private boolean isWaiting() { * @return true if all WAL roll finished */ public boolean walRollFinished() { - return walNeedsRoll.values().stream().noneMatch(needRoll -> needRoll) && isWaiting(); + return walNeedsRoll.values().stream().noneMatch(rc -> rc.isRequestRoll) && isWaiting(); } /** @@ -251,4 +248,35 @@ public void close() { running = false; interrupt(); } + + /** + * Independently control the roll of each wal. When use multiwal, + * can avoid all wal roll together. see HBASE-24665 for detail + */ + protected class RollController { + boolean isRequestRoll; + long lastRollTime; + + RollController() { + this.isRequestRoll = false; + this.lastRollTime = System.currentTimeMillis(); + } + + void requestRoll() { + this.isRequestRoll = true; + } + + void finishRoll() { + this.isRequestRoll = false; + this.lastRollTime = System.currentTimeMillis(); + } + + public boolean isRequestRoll() { + return isRequestRoll; + } + + boolean isPeriodRoll(long now) { + return (now - lastRollTime) > rollPeriod; + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java index 7892d4478f2a..fcb166c4fca5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java @@ -19,9 +19,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; @@ -35,6 +38,8 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.HashMap; +import java.util.Map; @Category({RegionServerTests.class, MediumTests.class}) public class TestLogRoller { @@ -43,7 +48,7 @@ public class TestLogRoller { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestLogRoller.class); - private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final int logRollPeriod = 20 * 1000; @@ -92,4 +97,41 @@ public void testRemoveClosedWAL() throws Exception { assertEquals(originalSize, logRoller.getWalNeedsRoll().size()); } + + /** + * verify that each wal roll separately + */ + @Test + public void testRequestRollWithMultiWal() throws Exception { + HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); + Configuration conf = rs.getConfiguration(); + LogRoller logRoller = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWalRoller(); + FileSystem fs = rs.getFileSystem(); + // add multiple wal + Map wals = new HashMap<>(); + for (int i = 1; i <= 3; i++) { + FSHLog wal = new FSHLog(fs, rs.getWALRootDir(), + AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), + AbstractFSWALProvider.getWALArchiveDirectoryName(conf, rs.getServerName().getServerName()), + conf, null, true, "wal-test", "." + i); + wal.init(); + wals.put(wal, wal.getCurrentFileName()); + logRoller.addWAL(wal); + Thread.sleep(3000); + } + + // request roll + Map.Entry rollWal = wals.entrySet().iterator().next(); + rollWal.getKey().requestLogRoll(); + Thread.sleep(5000); + assertNotEquals(rollWal.getValue(), rollWal.getKey().getCurrentFileName()); + wals.put(rollWal.getKey(), rollWal.getKey().getCurrentFileName()); + + // period roll + Thread.sleep(logRollPeriod + 5000); + for (Map.Entry entry : wals.entrySet()) { + assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName()); + entry.getKey().close(); + } + } } From 8b427b3be54a59b2cf1ac4bf45f494e0d3f7ad65 Mon Sep 17 00:00:00 2001 From: wen_yi Date: Mon, 13 Jul 2020 13:54:07 +0800 Subject: [PATCH 3/7] fix --- .../hadoop/hbase/regionserver/LogRoller.java | 9 +- .../hadoop/hbase/wal/AbstractWALRoller.java | 85 ++++++++++--------- .../hbase/regionserver/TestLogRoller.java | 14 ++- 3 files changed, 56 insertions(+), 52 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index e12fd9f93542..cbab595517ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -66,11 +65,7 @@ protected void scheduleFlush(String encodedRegionName, List families) { } @VisibleForTesting - Map getWalNeedsRoll() { - Map walNeedsRoll = new HashMap<>(); - for (Map.Entry entry : this.walNeedsRoll.entrySet()) { - walNeedsRoll.put(entry.getKey(), entry.getValue().isRequestRoll()); - } - return walNeedsRoll; + Map getWalNeedsRoll() { + return this.wals; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index 409808b36bc1..2d387be990f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -57,7 +57,7 @@ public abstract class AbstractWALRoller extends Thread protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period"; - protected final ConcurrentMap walNeedsRoll = new ConcurrentHashMap<>(); + protected final ConcurrentMap wals = new ConcurrentHashMap<>(); protected final T abortable; // Period to roll log. private final long rollPeriod; @@ -69,18 +69,18 @@ public abstract class AbstractWALRoller extends Thread public void addWAL(WAL wal) { // check without lock first - if (walNeedsRoll.containsKey(wal)) { + if (wals.containsKey(wal)) { return; } // this is to avoid race between addWAL and requestRollAll. synchronized (this) { - if (walNeedsRoll.putIfAbsent(wal, new RollController()) == null) { + if (wals.putIfAbsent(wal, new RollController(wal)) == null) { wal.registerWALActionsListener(new WALActionsListener() { @Override public void logRollRequested(WALActionsListener.RollRequestReason reason) { // TODO logs will contend with each other here, replace with e.g. DelayedQueue synchronized (AbstractWALRoller.this) { - RollController controller = walNeedsRoll.computeIfAbsent(wal, rc -> new RollController()); + RollController controller = wals.computeIfAbsent(wal, rc -> new RollController(wal)); controller.requestRoll(); AbstractWALRoller.this.notifyAll(); } @@ -92,7 +92,7 @@ public void logRollRequested(WALActionsListener.RollRequestReason reason) { public void requestRollAll() { synchronized (this) { - for (RollController controller : walNeedsRoll.values()) { + for (RollController controller : wals.values()) { controller.requestRoll(); } notifyAll(); @@ -113,9 +113,9 @@ protected AbstractWALRoller(String name, Configuration conf, T abortable) { */ private void checkLowReplication(long now) { try { - for (Entry entry : walNeedsRoll.entrySet()) { + for (Entry entry : wals.entrySet()) { WAL wal = entry.getKey(); - boolean needRollAlready = entry.getValue().isRequestRoll; + boolean needRollAlready = entry.getValue().isRollRequested(); if (needRollAlready || !(wal instanceof AbstractFSWAL)) { continue; } @@ -131,7 +131,7 @@ private void abort(String reason, Throwable cause) { // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it // is already broken. - for (WAL wal : walNeedsRoll.keySet()) { + for (WAL wal : wals.keySet()) { // shutdown rather than close here since we are going to abort the RS and the wals need to be // split when recovery try { @@ -148,43 +148,39 @@ public void run() { while (running) { long now = System.currentTimeMillis(); checkLowReplication(now); - if (walNeedsRoll.values().stream().anyMatch(rc -> rc.isPeriodRoll(now))) { - // Time for periodic roll, fall through - LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod); - } else { - synchronized (this) { - if (walNeedsRoll.values().stream().anyMatch(rc -> rc.isRequestRoll)) { - // WAL roll requested, fall through - LOG.debug("WAL roll requested"); - } else { - try { - wait(this.threadWakeFrequency); - } catch (InterruptedException e) { - // restore the interrupt state - Thread.currentThread().interrupt(); - } - // goto the beginning to check whether again whether we should fall through to roll - // several WALs, and also check whether we should quit. - continue; + synchronized (this) { + if (wals.values().stream().noneMatch(rc -> rc.needsRoll(now))) { + try { + wait(this.threadWakeFrequency); + } catch (InterruptedException e) { + // restore the interrupt state + Thread.currentThread().interrupt(); } + // goto the beginning to check whether again whether we should fall through to roll + // several WALs, and also check whether we should quit. + continue; } } try { - for (Iterator> iter = walNeedsRoll.entrySet().iterator(); + for (Iterator> iter = wals.entrySet().iterator(); iter.hasNext();) { Entry entry = iter.next(); + WAL wal = entry.getKey(); RollController controller = entry.getValue(); - if (!controller.isRequestRoll && !controller.isPeriodRoll(now)) { + if (controller.isRollRequested()) { + // WAL roll requested, fall through + LOG.debug("WAL {} roll requested", wal); + } else if (controller.needsPeriodicRoll(now)){ + // Time for periodic roll, fall through + LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod); + } else { continue; } - WAL wal = entry.getKey(); - // reset the flag in front to avoid missing roll request before we return from rollWriter. - controller.finishRoll(); Map> regionsToFlush = null; try { // Force the roll if the logroll.period is elapsed or if a roll was requested. // The returned value is an collection of actual region and family names. - regionsToFlush = wal.rollWriter(true); + regionsToFlush = controller.rollWal(now); } catch (WALClosedException e) { LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e); iter.remove(); @@ -231,7 +227,7 @@ private boolean isWaiting() { * @return true if all WAL roll finished */ public boolean walRollFinished() { - return walNeedsRoll.values().stream().noneMatch(rc -> rc.isRequestRoll) && isWaiting(); + return wals.values().stream().noneMatch(RollController::isRollRequested) && isWaiting(); } /** @@ -254,29 +250,36 @@ public void close() { * can avoid all wal roll together. see HBASE-24665 for detail */ protected class RollController { - boolean isRequestRoll; - long lastRollTime; + private final WAL wal; + private boolean isRequestRoll; + private long lastRollTime; - RollController() { + RollController(WAL wal) { + this.wal = wal; this.isRequestRoll = false; this.lastRollTime = System.currentTimeMillis(); } - void requestRoll() { + public synchronized void requestRoll() { this.isRequestRoll = true; } - void finishRoll() { + public synchronized Map> rollWal(long lastRollTime) throws IOException { this.isRequestRoll = false; - this.lastRollTime = System.currentTimeMillis(); + this.lastRollTime = lastRollTime; + return wal.rollWriter(true); } - public boolean isRequestRoll() { + public boolean isRollRequested() { return isRequestRoll; } - boolean isPeriodRoll(long now) { + public boolean needsPeriodicRoll(long now) { return (now - lastRollTime) > rollPeriod; } + + public boolean needsRoll(long now) { + return isRequestRoll || needsPeriodicRoll(now); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java index fcb166c4fca5..1bcaca79caed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java @@ -39,6 +39,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; @Category({RegionServerTests.class, MediumTests.class}) @@ -121,11 +122,16 @@ public void testRequestRollWithMultiWal() throws Exception { } // request roll - Map.Entry rollWal = wals.entrySet().iterator().next(); - rollWal.getKey().requestLogRoll(); + Iterator> it = wals.entrySet().iterator(); + Map.Entry walEntry = it.next(); + walEntry.getKey().requestLogRoll(); Thread.sleep(5000); - assertNotEquals(rollWal.getValue(), rollWal.getKey().getCurrentFileName()); - wals.put(rollWal.getKey(), rollWal.getKey().getCurrentFileName()); + assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName()); + walEntry.setValue(walEntry.getKey().getCurrentFileName()); + while (it.hasNext()) { + walEntry = it.next(); + assertEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName()); + } // period roll Thread.sleep(logRollPeriod + 5000); From 15cdba79082d52995c1ad69703007207dbffca41 Mon Sep 17 00:00:00 2001 From: wen_yi Date: Tue, 14 Jul 2020 17:02:47 +0800 Subject: [PATCH 4/7] fix RollController and TestLogRoller --- .../hadoop/hbase/wal/AbstractWALRoller.java | 42 ++++--- .../hbase/regionserver/TestLogRoller.java | 103 ++++++++++-------- 2 files changed, 80 insertions(+), 65 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index 2d387be990f7..31dc12396266 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -26,6 +26,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; @@ -115,7 +116,7 @@ private void checkLowReplication(long now) { try { for (Entry entry : wals.entrySet()) { WAL wal = entry.getKey(); - boolean needRollAlready = entry.getValue().isRollRequested(); + boolean needRollAlready = entry.getValue().needsRoll(now); if (needRollAlready || !(wal instanceof AbstractFSWAL)) { continue; } @@ -167,29 +168,31 @@ public void run() { Entry entry = iter.next(); WAL wal = entry.getKey(); RollController controller = entry.getValue(); + boolean isRequestRoll; if (controller.isRollRequested()) { // WAL roll requested, fall through LOG.debug("WAL {} roll requested", wal); + isRequestRoll = true; } else if (controller.needsPeriodicRoll(now)){ // Time for periodic roll, fall through LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod); + isRequestRoll = false; } else { continue; } - Map> regionsToFlush = null; try { // Force the roll if the logroll.period is elapsed or if a roll was requested. // The returned value is an collection of actual region and family names. - regionsToFlush = controller.rollWal(now); + Map> regionsToFlush = controller.rollWal(now, isRequestRoll); + if (regionsToFlush != null) { + for (Map.Entry> r : regionsToFlush.entrySet()) { + scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); + } + } } catch (WALClosedException e) { LOG.warn("WAL has been closed. Skipping rolling of writer and just remove it", e); iter.remove(); } - if (regionsToFlush != null) { - for (Map.Entry> r : regionsToFlush.entrySet()) { - scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); - } - } afterRoll(wal); } } catch (FailedLogCloseException | ConnectException e) { @@ -251,35 +254,38 @@ public void close() { */ protected class RollController { private final WAL wal; - private boolean isRequestRoll; + // avoid missing roll request before we return from rollWriter + private final AtomicInteger rollRequestCounter; private long lastRollTime; RollController(WAL wal) { this.wal = wal; - this.isRequestRoll = false; + this.rollRequestCounter = new AtomicInteger(0); this.lastRollTime = System.currentTimeMillis(); } - public synchronized void requestRoll() { - this.isRequestRoll = true; + public void requestRoll() { + this.rollRequestCounter.incrementAndGet(); } - public synchronized Map> rollWal(long lastRollTime) throws IOException { - this.isRequestRoll = false; - this.lastRollTime = lastRollTime; + public Map> rollWal(long now, boolean isRequestRoll) throws IOException { + if (isRequestRoll) { + this.rollRequestCounter.decrementAndGet(); + } + this.lastRollTime = now; return wal.rollWriter(true); } public boolean isRollRequested() { - return isRequestRoll; + return rollRequestCounter.get() > 0; } public boolean needsPeriodicRoll(long now) { - return (now - lastRollTime) > rollPeriod; + return (now - this.lastRollTime) > rollPeriod; } public boolean needsRoll(long now) { - return isRequestRoll || needsPeriodicRoll(now); + return isRollRequested() || needsPeriodicRoll(now); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java index 1bcaca79caed..1875e9fb8801 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java @@ -27,17 +27,17 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -51,52 +51,62 @@ public class TestLogRoller { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final int logRollPeriod = 20 * 1000; + private static final int LOG_ROLL_PERIOD = 20 * 1000; + private static final String LOG_DIR = "WALs"; + private static final String ARCHIVE_DIR = "archiveWALs"; + private static final String WAL_PREFIX = "test-log-roller"; + private static Configuration CONF; + private static LogRoller ROLLER; + private static Path ROOT_DIR; + private static FileSystem FS; @Before public void setup() throws Exception { - TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.period", logRollPeriod); - TEST_UTIL.startMiniCluster(1); - TableName name = TableName.valueOf("Test"); - TEST_UTIL.createTable(name, Bytes.toBytes("cf")); - TEST_UTIL.waitTableAvailable(name); + CONF = TEST_UTIL.getConfiguration(); + CONF.setInt("hbase.regionserver.logroll.period", LOG_ROLL_PERIOD); + CONF.setInt(HConstants.THREAD_WAKE_FREQUENCY, 300); + ROOT_DIR = TEST_UTIL.getRandomDir(); + FS = FileSystem.get(CONF); + RegionServerServices services = Mockito.mock(RegionServerServices.class); + Mockito.when(services.getConfiguration()).thenReturn(CONF); + ROLLER = new LogRoller(services); + ROLLER.start(); } @After public void tearDown() throws Exception { + ROLLER.close(); + FS.close(); TEST_UTIL.shutdownMiniCluster(); } @Test public void testRemoveClosedWAL() throws Exception { - HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); - Configuration conf = rs.getConfiguration(); - LogRoller logRoller = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWalRoller(); - int originalSize = logRoller.getWalNeedsRoll().size(); - FSHLog wal1 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(), - AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf); - logRoller.addWAL(wal1); - FSHLog wal2 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(), - AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf); - logRoller.addWAL(wal2); - FSHLog wal3 = new FSHLog(rs.getWALFileSystem(), rs.getWALRootDir(), - AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), conf); - logRoller.addWAL(wal3); - - assertEquals(originalSize + 3, logRoller.getWalNeedsRoll().size()); - assertTrue(logRoller.getWalNeedsRoll().containsKey(wal1)); - - wal1.close(); - Thread.sleep(2 * logRollPeriod); - - assertEquals(originalSize + 2, logRoller.getWalNeedsRoll().size()); - assertFalse(logRoller.getWalNeedsRoll().containsKey(wal1)); - - wal2.close(); - wal3.close(); - Thread.sleep(2 * logRollPeriod); - - assertEquals(originalSize, logRoller.getWalNeedsRoll().size()); + assertEquals(0, ROLLER.getWalNeedsRoll().size()); + for (int i = 1; i <= 3; i++) { + FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null, + true, WAL_PREFIX, getWALSuffix(i)); + ROLLER.addWAL(wal); + } + + assertEquals(3, ROLLER.getWalNeedsRoll().size()); + Iterator it = ROLLER.getWalNeedsRoll().keySet().iterator(); + WAL wal = it.next(); + assertTrue(ROLLER.getWalNeedsRoll().containsKey(wal)); + + wal.close(); + Thread.sleep(LOG_ROLL_PERIOD + 5000); + + assertEquals(2, ROLLER.getWalNeedsRoll().size()); + assertFalse(ROLLER.getWalNeedsRoll().containsKey(wal)); + + wal = it.next(); + wal.close(); + wal = it.next(); + wal.close(); + Thread.sleep(LOG_ROLL_PERIOD + 5000); + + assertEquals(0, ROLLER.getWalNeedsRoll().size()); } /** @@ -104,20 +114,14 @@ public void testRemoveClosedWAL() throws Exception { */ @Test public void testRequestRollWithMultiWal() throws Exception { - HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); - Configuration conf = rs.getConfiguration(); - LogRoller logRoller = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWalRoller(); - FileSystem fs = rs.getFileSystem(); // add multiple wal Map wals = new HashMap<>(); for (int i = 1; i <= 3; i++) { - FSHLog wal = new FSHLog(fs, rs.getWALRootDir(), - AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().getServerName()), - AbstractFSWALProvider.getWALArchiveDirectoryName(conf, rs.getServerName().getServerName()), - conf, null, true, "wal-test", "." + i); + FSHLog wal = new FSHLog(FS, ROOT_DIR, LOG_DIR, ARCHIVE_DIR, CONF, null, + true, WAL_PREFIX, getWALSuffix(i)); wal.init(); wals.put(wal, wal.getCurrentFileName()); - logRoller.addWAL(wal); + ROLLER.addWAL(wal); Thread.sleep(3000); } @@ -126,6 +130,7 @@ public void testRequestRollWithMultiWal() throws Exception { Map.Entry walEntry = it.next(); walEntry.getKey().requestLogRoll(); Thread.sleep(5000); + assertNotEquals(walEntry.getValue(), walEntry.getKey().getCurrentFileName()); walEntry.setValue(walEntry.getKey().getCurrentFileName()); while (it.hasNext()) { @@ -134,10 +139,14 @@ public void testRequestRollWithMultiWal() throws Exception { } // period roll - Thread.sleep(logRollPeriod + 5000); + Thread.sleep(LOG_ROLL_PERIOD + 5000); for (Map.Entry entry : wals.entrySet()) { assertNotEquals(entry.getValue(), entry.getKey().getCurrentFileName()); entry.getKey().close(); } } + + private static String getWALSuffix(int id) { + return "." + id; + } } From 3c1a4807a3f938d009e5b1326f30db271d68f9d8 Mon Sep 17 00:00:00 2001 From: wen_yi Date: Mon, 20 Jul 2020 16:15:42 +0800 Subject: [PATCH 5/7] fix --- .../hadoop/hbase/wal/AbstractWALRoller.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index 31dc12396266..abe7a06dc007 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -26,7 +26,7 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; @@ -183,7 +183,7 @@ public void run() { try { // Force the roll if the logroll.period is elapsed or if a roll was requested. // The returned value is an collection of actual region and family names. - Map> regionsToFlush = controller.rollWal(now, isRequestRoll); + Map> regionsToFlush = controller.rollWal(now); if (regionsToFlush != null) { for (Map.Entry> r : regionsToFlush.entrySet()) { scheduleFlush(Bytes.toString(r.getKey()), r.getValue()); @@ -254,30 +254,28 @@ public void close() { */ protected class RollController { private final WAL wal; - // avoid missing roll request before we return from rollWriter - private final AtomicInteger rollRequestCounter; + private final AtomicBoolean rollRequest; private long lastRollTime; RollController(WAL wal) { this.wal = wal; - this.rollRequestCounter = new AtomicInteger(0); + this.rollRequest = new AtomicBoolean(false); this.lastRollTime = System.currentTimeMillis(); } public void requestRoll() { - this.rollRequestCounter.incrementAndGet(); + this.rollRequest.set(true); } - public Map> rollWal(long now, boolean isRequestRoll) throws IOException { - if (isRequestRoll) { - this.rollRequestCounter.decrementAndGet(); - } + public Map> rollWal(long now) throws IOException { this.lastRollTime = now; - return wal.rollWriter(true); + Map> regionsToFlush = wal.rollWriter(true); + this.rollRequest.set(false); + return regionsToFlush; } public boolean isRollRequested() { - return rollRequestCounter.get() > 0; + return rollRequest.get(); } public boolean needsPeriodicRoll(long now) { From 11e9e134606753f1706c89699a541bbb2a64195d Mon Sep 17 00:00:00 2001 From: wen_yi Date: Mon, 20 Jul 2020 19:10:06 +0800 Subject: [PATCH 6/7] fix --- .../java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index abe7a06dc007..69d5781313d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -269,9 +269,9 @@ public void requestRoll() { public Map> rollWal(long now) throws IOException { this.lastRollTime = now; - Map> regionsToFlush = wal.rollWriter(true); + // reset the flag in front to avoid missing roll request before we return from rollWriter. this.rollRequest.set(false); - return regionsToFlush; + return wal.rollWriter(true); } public boolean isRollRequested() { From 18bfdf715d063b64b50f3364dee726ffe1d6716a Mon Sep 17 00:00:00 2001 From: wen_yi Date: Tue, 21 Jul 2020 20:02:44 +0800 Subject: [PATCH 7/7] fix --- .../org/apache/hadoop/hbase/wal/AbstractWALRoller.java | 7 +++---- .../apache/hadoop/hbase/regionserver/TestLogRoller.java | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index 69d5781313d2..a5a0ee3a3225 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -168,15 +168,12 @@ public void run() { Entry entry = iter.next(); WAL wal = entry.getKey(); RollController controller = entry.getValue(); - boolean isRequestRoll; if (controller.isRollRequested()) { // WAL roll requested, fall through LOG.debug("WAL {} roll requested", wal); - isRequestRoll = true; } else if (controller.needsPeriodicRoll(now)){ // Time for periodic roll, fall through LOG.debug("WAL {} roll period {} ms elapsed", wal, this.rollPeriod); - isRequestRoll = false; } else { continue; } @@ -230,7 +227,9 @@ private boolean isWaiting() { * @return true if all WAL roll finished */ public boolean walRollFinished() { - return wals.values().stream().noneMatch(RollController::isRollRequested) && isWaiting(); + // TODO add a status field of roll in RollController + return wals.values().stream().noneMatch(rc -> rc.needsRoll(System.currentTimeMillis())) + && isWaiting(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java index 1875e9fb8801..ed7d5dc9c430 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestLogRoller.java @@ -122,7 +122,7 @@ public void testRequestRollWithMultiWal() throws Exception { wal.init(); wals.put(wal, wal.getCurrentFileName()); ROLLER.addWAL(wal); - Thread.sleep(3000); + Thread.sleep(1000); } // request roll