From d8c5de7f774cb33b4d47da1728b90d16b3d3285b Mon Sep 17 00:00:00 2001 From: chenglei Date: Mon, 13 Jul 2020 16:25:56 +0800 Subject: [PATCH 1/4] HBASE-24625 addnum --- .../hbase/io/asyncfs/WrapperAsyncFSOutput.java | 8 +++++--- .../regionserver/wal/AsyncProtobufLogWriter.java | 14 ++++++++++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java index 39f1f71e2473..c7cc1fcfcb4b 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java @@ -94,9 +94,11 @@ private void flush0(CompletableFuture future, ByteArrayOutputStream buffer } } long pos = out.getPos(); - if(pos > this.syncedLength) { - this.syncedLength = pos; - } + /** + * This flush0 method could only be called by single thread, so here we could + * safely overwrite without any synchronization. + */ + this.syncedLength = pos; future.complete(pos); } catch (IOException e) { future.completeExceptionally(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 8c944b1bdf57..a6fff1e84300 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -59,7 +59,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter private final Class channelClass; - private AsyncFSOutput output; + private volatile AsyncFSOutput output; private static final class OutputStreamWrapper extends OutputStream implements ByteBufferWriter { @@ -234,6 +234,16 @@ protected OutputStream getOutputStreamForCellEncoder() { @Override public long getSyncedLength() { - return this.output.getSyncedLength(); + AsyncFSOutput outputToUse = this.output; + if(outputToUse == null) { + /** + * When this method is called and output is null,it means the caller + * may incorrectly call this method because some synchronizing errors or other, + * so we should explicitly throw exception to indicate this illegal state. + */ + throw new IllegalStateException("The output is null when getSyncedLength is called," + + "it is a illegal state might caused by some synchronizing errors or other"); + } + return outputToUse.getSyncedLength(); } } From 91eac7ce9046e5b12ea8226ab1e0d660fa682745 Mon Sep 17 00:00:00 2001 From: chenglei Date: Mon, 13 Jul 2020 16:28:10 +0800 Subject: [PATCH 2/4] HBASE-24625 addnum --- .../hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index a6fff1e84300..dbbc09632e17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -237,7 +237,7 @@ public long getSyncedLength() { AsyncFSOutput outputToUse = this.output; if(outputToUse == null) { /** - * When this method is called and output is null,it means the caller + * When this method is called and output is null, it means the caller * may incorrectly call this method because some synchronizing errors or other, * so we should explicitly throw exception to indicate this illegal state. */ From efb161f6791207b0cb9511fdf38e64ca819ad6ee Mon Sep 17 00:00:00 2001 From: chenglei Date: Wed, 22 Jul 2020 17:47:44 +0800 Subject: [PATCH 3/4] add finalSyncedLength to support getSyncedLength after AsyncProtobufWriter.close --- .../wal/AsyncProtobufLogWriter.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index dbbc09632e17..741affe52939 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -60,6 +60,10 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter private final Class channelClass; private volatile AsyncFSOutput output; + /** + * Save {@link AsyncFSOutput#getSyncedLength()} when {@link #output} is closed. + */ + private volatile long finalSyncedLength = -1; private static final class OutputStreamWrapper extends OutputStream implements ByteBufferWriter { @@ -156,6 +160,7 @@ public synchronized void close() throws IOException { LOG.warn("normal close failed, try recover", e); output.recoverAndClose(null); } + this.finalSyncedLength = this.output.getSyncedLength(); this.output = null; } @@ -234,15 +239,16 @@ protected OutputStream getOutputStreamForCellEncoder() { @Override public long getSyncedLength() { + /** + * The last statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} + * is a sync point, if output is null, then finalSyncedLength must set, + * so we can return finalSyncedLength, else we return output.getSyncedLength + */ AsyncFSOutput outputToUse = this.output; if(outputToUse == null) { - /** - * When this method is called and output is null, it means the caller - * may incorrectly call this method because some synchronizing errors or other, - * so we should explicitly throw exception to indicate this illegal state. - */ - throw new IllegalStateException("The output is null when getSyncedLength is called," + - "it is a illegal state might caused by some synchronizing errors or other"); + long finalSyncedLengthToUse = this.finalSyncedLength; + assert finalSyncedLengthToUse >= 0; + return finalSyncedLengthToUse; } return outputToUse.getSyncedLength(); } From 255b26b380f19ec2599c41048f1b6d05ce1c7f97 Mon Sep 17 00:00:00 2001 From: chenglei Date: Wed, 22 Jul 2020 18:01:18 +0800 Subject: [PATCH 4/4] add finalSyncedLength to support getSyncedLength after AsyncProtobufWriter.close --- .../hbase/regionserver/wal/AsyncProtobufLogWriter.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 741affe52939..e834d654310c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -160,6 +160,12 @@ public synchronized void close() throws IOException { LOG.warn("normal close failed, try recover", e); output.recoverAndClose(null); } + /** + * We have to call {@link AsyncFSOutput#getSyncedLength()} + * after {@link AsyncFSOutput#close()} to get the final length + * synced to underlying filesystem because {@link AsyncFSOutput#close()} + * may also flush some data to underlying filesystem. + */ this.finalSyncedLength = this.output.getSyncedLength(); this.output = null; } @@ -240,7 +246,7 @@ protected OutputStream getOutputStreamForCellEncoder() { @Override public long getSyncedLength() { /** - * The last statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} + * The statement "this.output = null;" in {@link AsyncProtobufLogWriter#close} * is a sync point, if output is null, then finalSyncedLength must set, * so we can return finalSyncedLength, else we return output.getSyncedLength */