From 35a9faff099b79d8cbc2086a01bcb15627deef10 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Mon, 5 Sep 2022 21:41:44 +0800 Subject: [PATCH 1/5] HBASE-27358 Avoid synchronization in AsyncFSWAL --- .../hbase/regionserver/wal/AsyncFSWAL.java | 298 ++++++++++-------- 1 file changed, 166 insertions(+), 132 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 8aec10cb1cf5..348dc6db371b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -34,15 +34,13 @@ import java.util.Queue; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -52,6 +50,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; @@ -150,8 +149,6 @@ public class AsyncFSWAL extends AbstractFSWAL { private final Class channelClass; - private final Lock consumeLock = new ReentrantLock(); - private final Runnable consumer = this::consume; // check if there is already a consumer task in the event loop's task queue @@ -165,11 +162,7 @@ public class AsyncFSWAL extends AbstractFSWAL { // all other bits are the epoch number of the current writer, this is used to detect whether the // writer is still the one when you issue the sync. // notice that, modification to this field is only allowed under the protection of consumeLock. - private volatile int epochAndState; - - private boolean readyForRolling; - - private final Condition readyForRollingCond = consumeLock.newCondition(); + private int epochAndState; private final RingBuffer waitingConsumePayloads; @@ -202,6 +195,15 @@ public class AsyncFSWAL extends AbstractFSWAL { private final StreamSlowMonitor streamSlowMonitor; + /** + * {@link AsyncFSWAL#doReplaceWriter} and {@link AsyncFSWAL#doShutdown} is protected by + * {@link AbstractFSWAL#rollWriterLock}, there is at most one method call at the same time,so we + * could just use a simple variable to save the request. + */ + private volatile ReplaceWriterRequest replaceWriterRequest = null; + + private boolean alreadyProcessedShutdown = false; + public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class channelClass) @@ -272,8 +274,8 @@ private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { syncFutureCache.offer(future); } - private static boolean waitingRoll(int epochAndState) { - return (epochAndState & 1) != 0; + private boolean waitingRoll() { + return this.replaceWriterRequest != null; } private static boolean writerBroken(int epochAndState) { @@ -284,53 +286,51 @@ private static int epoch(int epochAndState) { return epochAndState >>> 2; } + private boolean completeRolling() { + ReplaceWriterRequest replaceWriterRequestToUse = this.replaceWriterRequest; + if (replaceWriterRequestToUse == null) { + return false; + } + this.replaceWriterRequest = null; + if (replaceWriterRequestToUse.shutdown) { + this.processShutdown(replaceWriterRequestToUse); + } else { + this.processReplaceWriter(replaceWriterRequestToUse); + } + return true; + } + // return whether we have successfully set readyForRolling to true. - private boolean trySetReadyForRolling() { + private boolean tryCompleteRolling() { // Check without holding lock first. Usually we will just return here. // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to // check them outside the consumeLock. - if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { + if (!waitingRoll() || !unackedAppends.isEmpty()) { return false; } - consumeLock.lock(); - try { - // 1. a roll is requested - // 2. all out-going entries have been acked(we have confirmed above). - if (waitingRoll(epochAndState)) { - readyForRolling = true; - readyForRollingCond.signalAll(); - return true; - } else { - return false; - } - } finally { - consumeLock.unlock(); - } + + return this.completeRolling(); } private void syncFailed(long epochWhenSync, Throwable error) { LOG.warn("sync failed", error); boolean shouldRequestLogRoll = true; - consumeLock.lock(); - try { - int currentEpochAndState = epochAndState; - if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { - // this is not the previous writer which means we have already rolled the writer. - // or this is still the current writer, but we have already marked it as broken and request - // a roll. - return; - } - this.epochAndState = currentEpochAndState | 0b10; - if (waitingRoll(currentEpochAndState)) { - readyForRolling = true; - readyForRollingCond.signalAll(); - // this means we have already in the middle of a rollWriter so just tell the roller thread - // that you can continue without requesting an extra log roll. - shouldRequestLogRoll = false; - } - } finally { - consumeLock.unlock(); + + int currentEpochAndState = epochAndState; + if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { + // this is not the previous writer which means we have already rolled the writer. + // or this is still the current writer, but we have already marked it as broken and request + // a roll. + return; + } + this.epochAndState = currentEpochAndState | 0b10; + + if (this.completeRolling()) { + // this means we have already in the middle of a rollWriter so just tell the roller thread + // that you can continue without requesting an extra log roll. + shouldRequestLogRoll = false; } + for (Iterator iter = unackedAppends.descendingIterator(); iter.hasNext();) { toWriteAppends.addFirst(iter.next()); } @@ -384,7 +384,7 @@ private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processe } } postSync(System.nanoTime() - startTimeNs, finishSync()); - if (trySetReadyForRolling()) { + if (tryCompleteRolling()) { // we have just finished a roll, then do not need to check for log rolling, the writer will be // closed soon. return; @@ -551,7 +551,7 @@ private void appendAndSync() { if (unackedAppends.isEmpty()) { highestSyncedTxid.set(highestProcessedAppendTxid); finishSync(); - trySetReadyForRolling(); + tryCompleteRolling(); } return; } @@ -604,27 +604,26 @@ private void drainNonMarkerEditsAndFailSyncs() { } private void consume() { - consumeLock.lock(); - try { - int currentEpochAndState = epochAndState; - if (writerBroken(currentEpochAndState)) { - return; - } - if (waitingRoll(currentEpochAndState)) { - if (writer.getLength() > fileLengthAtLastSync) { - // issue a sync - sync(writer); - } else { - if (unackedAppends.isEmpty()) { - readyForRolling = true; - readyForRollingCond.signalAll(); - } + if (this.writer == null) { + this.completeRolling(); + return; + } + if (writerBroken(epochAndState)) { + this.completeRolling(); + return; + } + if (waitingRoll()) { + if (writer.getLength() > fileLengthAtLastSync) { + // issue a sync + sync(writer); + } else { + if (unackedAppends.isEmpty()) { + this.completeRolling(); } - return; } - } finally { - consumeLock.unlock(); + return; } + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) { @@ -649,6 +648,9 @@ private void consume() { drainNonMarkerEditsAndFailSyncs(); } appendAndSync(); + if (this.alreadyProcessedShutdown) { + return; + } if (hasConsumerTask.get()) { return; } @@ -685,7 +687,7 @@ private void consume() { private boolean shouldScheduleConsumer() { int currentEpochAndState = epochAndState; - if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { + if (writerBroken(currentEpochAndState) || waitingRoll()) { return false; } return consumerScheduled.compareAndSet(false, true); @@ -759,23 +761,10 @@ protected AsyncWriter createWriterInstance(Path path) throws IOException { return createAsyncWriter(fs, path); } - private void waitForSafePoint() { - consumeLock.lock(); - try { - int currentEpochAndState = epochAndState; - if (writerBroken(currentEpochAndState) || this.writer == null) { - return; - } - consumerScheduled.set(true); - epochAndState = currentEpochAndState | 1; - readyForRolling = false; - consumeExecutor.execute(consumer); - while (!readyForRolling) { - readyForRollingCond.awaitUninterruptibly(); - } - } finally { - consumeLock.unlock(); - } + private void addReplaceWriterRequest(ReplaceWriterRequest request) { + this.replaceWriterRequest = request; + consumerScheduled.set(true); + consumeExecutor.execute(consumer); } protected final long closeWriter(AsyncWriter writer, Path path) { @@ -800,70 +789,101 @@ protected final long closeWriter(AsyncWriter writer, Path path) { @Override protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) throws IOException { - Preconditions.checkNotNull(nextWriter); - waitForSafePoint(); - long oldFileLen = closeWriter(this.writer, oldPath); - logRollAndSetupWalProps(oldPath, newPath, oldFileLen); - this.writer = nextWriter; - if (nextWriter instanceof AsyncProtobufLogWriter) { - this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); - } - this.fileLengthAtLastSync = nextWriter.getLength(); - this.highestProcessedAppendTxidAtLastSync = 0L; - consumeLock.lock(); try { + Preconditions.checkNotNull(nextWriter); + AsyncWriter oldWriter = this.writer; + long oldFileLen = oldWriter == null ? 0 : oldWriter.getLength(); + ReplaceWriterRequest request = new ReplaceWriterRequest(oldPath, nextWriter, false); + addReplaceWriterRequest(request); + FutureUtils.get(request.future); + logRollAndSetupWalProps(oldPath, newPath, oldFileLen); + rollRequested.set(false); consumerScheduled.set(true); + consumeExecutor.execute(consumer); + } finally { + this.replaceWriterRequest = null; + } + } + + private void processReplaceWriter(ReplaceWriterRequest replaceWriterRequest) { + try { + Path oldPath = replaceWriterRequest.oldPath; + AsyncWriter nextWriter = replaceWriterRequest.nextWriter; + closeWriter(this.writer, oldPath); + this.writer = nextWriter; + if (nextWriter instanceof AsyncProtobufLogWriter) { + this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); + } + this.fileLengthAtLastSync = nextWriter.getLength(); + this.highestProcessedAppendTxidAtLastSync = 0L; + int currentEpoch = epochAndState >>> 2; int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; // set a new epoch and also clear waitingRoll and writerBroken this.epochAndState = nextEpoch << 2; - // Reset rollRequested status - rollRequested.set(false); - consumeExecutor.execute(consumer); - } finally { - consumeLock.unlock(); + replaceWriterRequest.future.complete(true); + } catch (Exception exception) { + LOG.error("processReplaceWriter error!", exception); + replaceWriterRequest.future.completeExceptionally(exception); } } @Override protected void doShutdown() throws IOException { - waitForSafePoint(); - closeWriter(this.writer, getOldPath()); - this.writer = null; - closeExecutor.shutdown(); try { - if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { - LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" - + " the close of async writer doesn't complete." - + "Please check the status of underlying filesystem" - + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS - + "\""); + ReplaceWriterRequest request = new ReplaceWriterRequest(getOldPath(), null, true); + addReplaceWriterRequest(request); + FutureUtils.get(request.future); + if (!(consumeExecutor instanceof EventLoop)) { + consumeExecutor.shutdown(); } - } catch (InterruptedException e) { - LOG.error("The wait for close of async writer is interrupted"); - Thread.currentThread().interrupt(); + } finally { + this.replaceWriterRequest = null; } - IOException error = new IOException("WAL has been closed"); - long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; - // drain all the pending sync requests - for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor - <= cursorBound; nextCursor++) { - if (!waitingConsumePayloads.isPublished(nextCursor)) { - break; + } + + private void processShutdown(ReplaceWriterRequest replaceWriterRequest) { + try { + closeWriter(this.writer, replaceWriterRequest.oldPath); + this.writer = null; + closeExecutor.shutdown(); + try { + if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { + LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" + + " the close of async writer doesn't complete." + + "Please check the status of underlying filesystem" + + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + + "\""); + } + } catch (InterruptedException e) { + LOG.error("The wait for close of async writer is interrupted"); + Thread.currentThread().interrupt(); } - RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); - switch (truck.type()) { - case SYNC: - syncFutures.add(truck.unloadSync()); - break; - default: + IOException error = new IOException("WAL has been closed"); + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; + // drain all the pending sync requests + for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor + <= cursorBound; nextCursor++) { + if (!waitingConsumePayloads.isPublished(nextCursor)) { break; + } + RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); + switch (truck.type()) { + case SYNC: + syncFutures.add(truck.unloadSync()); + break; + default: + break; + } } - } - // and fail them - syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); - if (!(consumeExecutor instanceof EventLoop)) { - consumeExecutor.shutdown(); + // and fail them + syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); + replaceWriterRequest.future.complete(true); + } catch (Exception exception) { + LOG.error("shut down error!", exception); + replaceWriterRequest.future.completeExceptionally(exception); + } finally { + this.alreadyProcessedShutdown = true; } } @@ -890,4 +910,18 @@ protected boolean doCheckLogLowReplication() { AsyncFSOutput output = this.fsOut; return output != null && output.isBroken(); } + + static class ReplaceWriterRequest { + private final CompletableFuture future; + private final Path oldPath; + private final AsyncWriter nextWriter; + private final boolean shutdown; + + ReplaceWriterRequest(Path oldPath, AsyncWriter nextWriter, boolean shutdown) { + this.oldPath = oldPath; + this.nextWriter = nextWriter; + this.future = new CompletableFuture(); + this.shutdown = shutdown; + } + } } From f1df0eeed4916a22fdd6ea65b25867e6b8c9b5f5 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Thu, 8 Sep 2022 13:21:36 +0800 Subject: [PATCH 2/5] fix atomic reference --- .../hbase/regionserver/wal/AsyncFSWAL.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 348dc6db371b..6aa1f75a5e14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -41,6 +41,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -200,7 +201,8 @@ public class AsyncFSWAL extends AbstractFSWAL { * {@link AbstractFSWAL#rollWriterLock}, there is at most one method call at the same time,so we * could just use a simple variable to save the request. */ - private volatile ReplaceWriterRequest replaceWriterRequest = null; + private final AtomicReference replaceWriterRequestRef = + new AtomicReference(null); private boolean alreadyProcessedShutdown = false; @@ -275,7 +277,7 @@ private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { } private boolean waitingRoll() { - return this.replaceWriterRequest != null; + return this.replaceWriterRequestRef.get() != null; } private static boolean writerBroken(int epochAndState) { @@ -287,15 +289,14 @@ private static int epoch(int epochAndState) { } private boolean completeRolling() { - ReplaceWriterRequest replaceWriterRequestToUse = this.replaceWriterRequest; - if (replaceWriterRequestToUse == null) { + ReplaceWriterRequest replaceWriterRequest = this.replaceWriterRequestRef.getAndSet(null); + if (replaceWriterRequest == null) { return false; } - this.replaceWriterRequest = null; - if (replaceWriterRequestToUse.shutdown) { - this.processShutdown(replaceWriterRequestToUse); + if (replaceWriterRequest.shutdown) { + this.completeShutdownRequest(replaceWriterRequest); } else { - this.processReplaceWriter(replaceWriterRequestToUse); + this.completeReplaceWriterRequest(replaceWriterRequest); } return true; } @@ -762,7 +763,8 @@ protected AsyncWriter createWriterInstance(Path path) throws IOException { } private void addReplaceWriterRequest(ReplaceWriterRequest request) { - this.replaceWriterRequest = request; + assert this.replaceWriterRequestRef.get() == null; + this.replaceWriterRequestRef.set(request); consumerScheduled.set(true); consumeExecutor.execute(consumer); } @@ -801,11 +803,11 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite consumerScheduled.set(true); consumeExecutor.execute(consumer); } finally { - this.replaceWriterRequest = null; + this.replaceWriterRequestRef.set(null); } } - private void processReplaceWriter(ReplaceWriterRequest replaceWriterRequest) { + private void completeReplaceWriterRequest(ReplaceWriterRequest replaceWriterRequest) { try { Path oldPath = replaceWriterRequest.oldPath; AsyncWriter nextWriter = replaceWriterRequest.nextWriter; @@ -838,11 +840,12 @@ protected void doShutdown() throws IOException { consumeExecutor.shutdown(); } } finally { - this.replaceWriterRequest = null; + this.replaceWriterRequestRef.set(null); + ; } } - private void processShutdown(ReplaceWriterRequest replaceWriterRequest) { + private void completeShutdownRequest(ReplaceWriterRequest replaceWriterRequest) { try { closeWriter(this.writer, replaceWriterRequest.oldPath); this.writer = null; From b85e93491b6ef2c5df18cc691870042b1c68e3eb Mon Sep 17 00:00:00 2001 From: comnetwork Date: Thu, 8 Sep 2022 13:58:50 +0800 Subject: [PATCH 3/5] add check --- .../apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 6aa1f75a5e14..3c536687dffe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -204,7 +204,7 @@ public class AsyncFSWAL extends AbstractFSWAL { private final AtomicReference replaceWriterRequestRef = new AtomicReference(null); - private boolean alreadyProcessedShutdown = false; + private boolean alreadyShutdown = false; public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, @@ -605,6 +605,9 @@ private void drainNonMarkerEditsAndFailSyncs() { } private void consume() { + if (this.alreadyShutdown) { + return; + } if (this.writer == null) { this.completeRolling(); return; @@ -649,7 +652,7 @@ private void consume() { drainNonMarkerEditsAndFailSyncs(); } appendAndSync(); - if (this.alreadyProcessedShutdown) { + if (this.alreadyShutdown) { return; } if (hasConsumerTask.get()) { @@ -886,7 +889,7 @@ private void completeShutdownRequest(ReplaceWriterRequest replaceWriterRequest) LOG.error("shut down error!", exception); replaceWriterRequest.future.completeExceptionally(exception); } finally { - this.alreadyProcessedShutdown = true; + this.alreadyShutdown = true; } } From e296594aedfd3533a6f69292c3f0cc2fe02e0d15 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Fri, 9 Sep 2022 11:43:47 +0800 Subject: [PATCH 4/5] fix checkstyle --- .../org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 3c536687dffe..5c3ec26ec975 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -844,7 +844,6 @@ protected void doShutdown() throws IOException { } } finally { this.replaceWriterRequestRef.set(null); - ; } } From 18fb6e70633309374eac70e7345c990a3ef3ba14 Mon Sep 17 00:00:00 2001 From: comnetwork Date: Fri, 16 Sep 2022 11:52:42 +0800 Subject: [PATCH 5/5] rename --- .../hbase/regionserver/wal/AsyncFSWAL.java | 59 ++++++++++--------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 5c3ec26ec975..86073c4c0741 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -201,8 +201,8 @@ public class AsyncFSWAL extends AbstractFSWAL { * {@link AbstractFSWAL#rollWriterLock}, there is at most one method call at the same time,so we * could just use a simple variable to save the request. */ - private final AtomicReference replaceWriterRequestRef = - new AtomicReference(null); + private final AtomicReference rollingRequestRef = + new AtomicReference(null); private boolean alreadyShutdown = false; @@ -277,7 +277,7 @@ private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { } private boolean waitingRoll() { - return this.replaceWriterRequestRef.get() != null; + return this.rollingRequestRef.get() != null; } private static boolean writerBroken(int epochAndState) { @@ -289,14 +289,17 @@ private static int epoch(int epochAndState) { } private boolean completeRolling() { - ReplaceWriterRequest replaceWriterRequest = this.replaceWriterRequestRef.getAndSet(null); - if (replaceWriterRequest == null) { + /** + * Poll the request. + */ + RollingRequest rollingRequest = this.rollingRequestRef.getAndSet(null); + if (rollingRequest == null) { return false; } - if (replaceWriterRequest.shutdown) { - this.completeShutdownRequest(replaceWriterRequest); + if (rollingRequest.shutdown) { + this.completeShutdown(rollingRequest); } else { - this.completeReplaceWriterRequest(replaceWriterRequest); + this.completeReplaceWriter(rollingRequest); } return true; } @@ -765,9 +768,9 @@ protected AsyncWriter createWriterInstance(Path path) throws IOException { return createAsyncWriter(fs, path); } - private void addReplaceWriterRequest(ReplaceWriterRequest request) { - assert this.replaceWriterRequestRef.get() == null; - this.replaceWriterRequestRef.set(request); + private void addRollingRequest(RollingRequest request) { + assert this.rollingRequestRef.get() == null; + this.rollingRequestRef.set(request); consumerScheduled.set(true); consumeExecutor.execute(consumer); } @@ -798,22 +801,22 @@ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWrite Preconditions.checkNotNull(nextWriter); AsyncWriter oldWriter = this.writer; long oldFileLen = oldWriter == null ? 0 : oldWriter.getLength(); - ReplaceWriterRequest request = new ReplaceWriterRequest(oldPath, nextWriter, false); - addReplaceWriterRequest(request); + RollingRequest request = new RollingRequest(oldPath, nextWriter, false); + addRollingRequest(request); FutureUtils.get(request.future); logRollAndSetupWalProps(oldPath, newPath, oldFileLen); rollRequested.set(false); consumerScheduled.set(true); consumeExecutor.execute(consumer); } finally { - this.replaceWriterRequestRef.set(null); + this.rollingRequestRef.set(null); } } - private void completeReplaceWriterRequest(ReplaceWriterRequest replaceWriterRequest) { + private void completeReplaceWriter(RollingRequest rollingRequest) { try { - Path oldPath = replaceWriterRequest.oldPath; - AsyncWriter nextWriter = replaceWriterRequest.nextWriter; + Path oldPath = rollingRequest.oldPath; + AsyncWriter nextWriter = rollingRequest.nextWriter; closeWriter(this.writer, oldPath); this.writer = nextWriter; if (nextWriter instanceof AsyncProtobufLogWriter) { @@ -826,30 +829,30 @@ private void completeReplaceWriterRequest(ReplaceWriterRequest replaceWriterRequ int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; // set a new epoch and also clear waitingRoll and writerBroken this.epochAndState = nextEpoch << 2; - replaceWriterRequest.future.complete(true); + rollingRequest.future.complete(true); } catch (Exception exception) { LOG.error("processReplaceWriter error!", exception); - replaceWriterRequest.future.completeExceptionally(exception); + rollingRequest.future.completeExceptionally(exception); } } @Override protected void doShutdown() throws IOException { try { - ReplaceWriterRequest request = new ReplaceWriterRequest(getOldPath(), null, true); - addReplaceWriterRequest(request); + RollingRequest request = new RollingRequest(getOldPath(), null, true); + addRollingRequest(request); FutureUtils.get(request.future); if (!(consumeExecutor instanceof EventLoop)) { consumeExecutor.shutdown(); } } finally { - this.replaceWriterRequestRef.set(null); + this.rollingRequestRef.set(null); } } - private void completeShutdownRequest(ReplaceWriterRequest replaceWriterRequest) { + private void completeShutdown(RollingRequest rollingRequest) { try { - closeWriter(this.writer, replaceWriterRequest.oldPath); + closeWriter(this.writer, rollingRequest.oldPath); this.writer = null; closeExecutor.shutdown(); try { @@ -883,10 +886,10 @@ private void completeShutdownRequest(ReplaceWriterRequest replaceWriterRequest) } // and fail them syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); - replaceWriterRequest.future.complete(true); + rollingRequest.future.complete(true); } catch (Exception exception) { LOG.error("shut down error!", exception); - replaceWriterRequest.future.completeExceptionally(exception); + rollingRequest.future.completeExceptionally(exception); } finally { this.alreadyShutdown = true; } @@ -916,13 +919,13 @@ protected boolean doCheckLogLowReplication() { return output != null && output.isBroken(); } - static class ReplaceWriterRequest { + static class RollingRequest { private final CompletableFuture future; private final Path oldPath; private final AsyncWriter nextWriter; private final boolean shutdown; - ReplaceWriterRequest(Path oldPath, AsyncWriter nextWriter, boolean shutdown) { + RollingRequest(Path oldPath, AsyncWriter nextWriter, boolean shutdown) { this.oldPath = oldPath; this.nextWriter = nextWriter; this.future = new CompletableFuture();