From 44828bae3fcc9032524bbcff557cc4bc485d8ab1 Mon Sep 17 00:00:00 2001 From: penghui Date: Mon, 6 Mar 2023 18:02:39 +0800 Subject: [PATCH 1/4] Make read entry request recyclable --- .../bookkeeper/proto/BookieProtoEncoding.java | 4 +-- .../bookkeeper/proto/BookieProtocol.java | 34 +++++++++++++++++-- .../proto/PerChannelBookieClient.java | 8 +++-- .../bookkeeper/proto/ReadEntryProcessor.java | 1 + .../proto/ReadEntryProcessorTest.java | 6 ++-- .../proto/TestBackwardCompatCMS42.java | 6 ++-- 6 files changed, 47 insertions(+), 12 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 3e41a3f5ea7..67fa8d998ae 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -193,9 +193,9 @@ public Object decode(ByteBuf packet) if ((flags & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING && version >= 2) { byte[] masterKey = readMasterKey(packet); - return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, masterKey); + return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, masterKey); } else { - return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, null); + return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, null); } case BookieProtocol.AUTH: BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 89654449aad..3479117cbd7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -362,14 +362,42 @@ public void recycle() { * A Request that reads data. */ class ReadRequest extends Request { - ReadRequest(byte protocolVersion, long ledgerId, long entryId, - short flags, byte[] masterKey) { - init(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey); + + static ReadRequest create(byte protocolVersion, long ledgerId, long entryId, + short flags, byte[] masterKey) { + ReadRequest read = RECYCLER.get(); + read.protocolVersion = protocolVersion; + read.ledgerId = ledgerId; + read.entryId = entryId; + read.flags = flags; + read.masterKey = masterKey; + return read; } boolean isFencing() { return (flags & FLAG_DO_FENCING) == FLAG_DO_FENCING; } + + private final Handle recyclerHandle; + + private ReadRequest(Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected ReadRequest newObject(Handle handle) { + return new ReadRequest(handle); + } + }; + + @Override + public void recycle() { + ledgerId = -1; + entryId = -1; + masterKey = null; + recyclerHandle.recycle(this); + } } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 47d5303bb37..92046428c5b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -849,7 +849,7 @@ public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) { Object request = null; CompletionKey completionKey = null; if (useV2WireProtocol) { - request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, + request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 0, (short) 0, null); completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC); } else { @@ -933,7 +933,7 @@ private void readEntryInternal(final long ledgerId, Object request = null; CompletionKey completionKey = null; if (useV2WireProtocol) { - request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, + request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, (short) flags, masterKey); completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY); } else { @@ -1170,6 +1170,10 @@ private void writeAndFlush(final Channel channel, }); channel.writeAndFlush(request, promise); + if (request instanceof BookieProtocol.Request) { + BookieProtocol.Request r = (BookieProtocol.Request) request; + r.recycle(); + } } catch (Throwable e) { LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); errorOut(key); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 6935ca8be60..64583dbbfe4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -122,6 +122,7 @@ protected void processPacket() { if (LOG.isTraceEnabled()) { LOG.trace("Read entry rc = {} for {}", errorCode, request); } + request.recycle(); sendResponse(data, errorCode, startTimeNanos); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java index 91e100d8093..251f900c096 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java @@ -107,7 +107,7 @@ private void testAsynchronousRequest(boolean result, int errorCode) throws Excep ExecutorService service = Executors.newCachedThreadPool(); long ledgerId = System.currentTimeMillis(); - ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, + ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{}); ReadEntryProcessor processor = ReadEntryProcessor.create( request, requestHandler, requestProcessor, service, true); @@ -150,7 +150,7 @@ private void testSynchronousRequest(boolean result, int errorCode) throws Except }).when(channel).writeAndFlush(any(Response.class)); long ledgerId = System.currentTimeMillis(); - ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, + ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{}); ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true); fenceResult.complete(result); @@ -180,7 +180,7 @@ public void testNonFenceRequest() throws Exception { }).when(channel).writeAndFlush(any(Response.class)); long ledgerId = System.currentTimeMillis(); - ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, + ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{}); ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true); processor.run(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java index 23997348e1b..e8e5fe55233 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java @@ -161,11 +161,13 @@ public void testAuthFail() throws Exception { } - client.sendRequest(new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, - 1L, 1L, (short) 0, null)); + ReadRequest read = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, + 1L, 1L, (short) 0, null); + client.sendRequest(read); Response response = client.takeResponse(); assertEquals("Should have failed", response.getErrorCode(), BookieProtocol.EUA); + read.recycle(); } // copy from TestAuth From 9715ea871f44a7aefbde46952901df8e82cef1f0 Mon Sep 17 00:00:00 2001 From: penghui Date: Tue, 7 Mar 2023 09:20:48 +0800 Subject: [PATCH 2/4] Move recycle to finally block --- .../bookkeeper/proto/PerChannelBookieClient.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 92046428c5b..b26a38027c1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -1143,14 +1143,6 @@ private void writeAndFlush(final Channel channel, StringUtils.requestToString(request)); errorOut(key, BKException.Code.TooManyRequestsException); - - // If the request is a V2 add request, we retained the data's reference when creating the AddRequest - // object. To avoid the object leak, we need to release the reference if we met any errors - // before sending it. - if (request instanceof BookieProtocol.AddRequest) { - BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) request; - ar.recycle(); - } return; } @@ -1170,13 +1162,14 @@ private void writeAndFlush(final Channel channel, }); channel.writeAndFlush(request, promise); + } catch (Throwable e) { + LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); + errorOut(key); + } finally { if (request instanceof BookieProtocol.Request) { BookieProtocol.Request r = (BookieProtocol.Request) request; r.recycle(); } - } catch (Throwable e) { - LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); - errorOut(key); } } From 9e932dd483eb751725862a5fd31cf2f7ca7cc433 Mon Sep 17 00:00:00 2001 From: penghui Date: Tue, 7 Mar 2023 21:33:44 +0800 Subject: [PATCH 3/4] Fix test and comments --- .../bookkeeper/proto/BookieProtoEncoding.java | 2 +- .../apache/bookkeeper/proto/BookieProtocol.java | 1 + .../bookkeeper/proto/PerChannelBookieClient.java | 14 ++++++++------ .../bookkeeper/proto/ReadEntryProcessor.java | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 67fa8d998ae..4a8e8b4ad77 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -142,7 +142,7 @@ public Object encode(Object msg, ByteBufAllocator allocator) if (r.hasMasterKey()) { buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH); } - + r.recycle(); return buf; } else if (r instanceof BookieProtocol.AuthRequest) { BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest) r).getAuthMessage(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 3479117cbd7..86c3ed54693 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -367,6 +367,7 @@ static ReadRequest create(byte protocolVersion, long ledgerId, long entryId, short flags, byte[] masterKey) { ReadRequest read = RECYCLER.get(); read.protocolVersion = protocolVersion; + read.opCode = READENTRY; read.ledgerId = ledgerId; read.entryId = entryId; read.flags = flags; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index b26a38027c1..4d08faad202 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -1143,6 +1143,14 @@ private void writeAndFlush(final Channel channel, StringUtils.requestToString(request)); errorOut(key, BKException.Code.TooManyRequestsException); + + // If the request is a V2 add request, we retained the data's reference when creating the AddRequest + // object. To avoid the object leak, we need to release the reference if we met any errors + // before sending it. + if (request instanceof BookieProtocol.AddRequest) { + BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) request; + ar.recycle(); + } return; } @@ -1160,16 +1168,10 @@ private void writeAndFlush(final Channel channel, nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } }); - channel.writeAndFlush(request, promise); } catch (Throwable e) { LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); errorOut(key); - } finally { - if (request instanceof BookieProtocol.Request) { - BookieProtocol.Request r = (BookieProtocol.Request) request; - r.recycle(); - } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 64583dbbfe4..c44216a0283 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -122,7 +122,6 @@ protected void processPacket() { if (LOG.isTraceEnabled()) { LOG.trace("Read entry rc = {} for {}", errorCode, request); } - request.recycle(); sendResponse(data, errorCode, startTimeNanos); } @@ -190,6 +189,7 @@ public String toString() { } private void recycle() { + request.recycle(); super.reset(); this.recyclerHandle.recycle(this); } From a29880ca6c2818895666c8cf530228c5a9ca5c7a Mon Sep 17 00:00:00 2001 From: penghui Date: Tue, 7 Mar 2023 22:21:21 +0800 Subject: [PATCH 4/4] Fix test --- .../org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java index e8e5fe55233..1edd74c1fc6 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java @@ -167,7 +167,6 @@ public void testAuthFail() throws Exception { Response response = client.takeResponse(); assertEquals("Should have failed", response.getErrorCode(), BookieProtocol.EUA); - read.recycle(); } // copy from TestAuth