diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 3e41a3f5ea7..4a8e8b4ad77 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -142,7 +142,7 @@ public Object encode(Object msg, ByteBufAllocator allocator) if (r.hasMasterKey()) { buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH); } - + r.recycle(); return buf; } else if (r instanceof BookieProtocol.AuthRequest) { BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest) r).getAuthMessage(); @@ -193,9 +193,9 @@ public Object decode(ByteBuf packet) if ((flags & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING && version >= 2) { byte[] masterKey = readMasterKey(packet); - return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, masterKey); + return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, masterKey); } else { - return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, null); + return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, null); } case BookieProtocol.AUTH: BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 89654449aad..86c3ed54693 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -362,14 +362,43 @@ public void recycle() { * A Request that reads data. */ class ReadRequest extends Request { - ReadRequest(byte protocolVersion, long ledgerId, long entryId, - short flags, byte[] masterKey) { - init(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey); + + static ReadRequest create(byte protocolVersion, long ledgerId, long entryId, + short flags, byte[] masterKey) { + ReadRequest read = RECYCLER.get(); + read.protocolVersion = protocolVersion; + read.opCode = READENTRY; + read.ledgerId = ledgerId; + read.entryId = entryId; + read.flags = flags; + read.masterKey = masterKey; + return read; } boolean isFencing() { return (flags & FLAG_DO_FENCING) == FLAG_DO_FENCING; } + + private final Handle recyclerHandle; + + private ReadRequest(Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + private static final Recycler RECYCLER = new Recycler() { + @Override + protected ReadRequest newObject(Handle handle) { + return new ReadRequest(handle); + } + }; + + @Override + public void recycle() { + ledgerId = -1; + entryId = -1; + masterKey = null; + recyclerHandle.recycle(this); + } } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 47d5303bb37..4d08faad202 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -849,7 +849,7 @@ public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) { Object request = null; CompletionKey completionKey = null; if (useV2WireProtocol) { - request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, + request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 0, (short) 0, null); completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC); } else { @@ -933,7 +933,7 @@ private void readEntryInternal(final long ledgerId, Object request = null; CompletionKey completionKey = null; if (useV2WireProtocol) { - request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, + request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, (short) flags, masterKey); completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY); } else { @@ -1168,7 +1168,6 @@ private void writeAndFlush(final Channel channel, nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } }); - channel.writeAndFlush(request, promise); } catch (Throwable e) { LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 6935ca8be60..c44216a0283 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -189,6 +189,7 @@ public String toString() { } private void recycle() { + request.recycle(); super.reset(); this.recyclerHandle.recycle(this); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java index 91e100d8093..251f900c096 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java @@ -107,7 +107,7 @@ private void testAsynchronousRequest(boolean result, int errorCode) throws Excep ExecutorService service = Executors.newCachedThreadPool(); long ledgerId = System.currentTimeMillis(); - ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, + ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{}); ReadEntryProcessor processor = ReadEntryProcessor.create( request, requestHandler, requestProcessor, service, true); @@ -150,7 +150,7 @@ private void testSynchronousRequest(boolean result, int errorCode) throws Except }).when(channel).writeAndFlush(any(Response.class)); long ledgerId = System.currentTimeMillis(); - ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, + ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, BookieProtocol.FLAG_DO_FENCING, new byte[]{}); ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true); fenceResult.complete(result); @@ -180,7 +180,7 @@ public void testNonFenceRequest() throws Exception { }).when(channel).writeAndFlush(any(Response.class)); long ledgerId = System.currentTimeMillis(); - ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, + ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{}); ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true); processor.run(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java index 23997348e1b..1edd74c1fc6 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java @@ -161,8 +161,9 @@ public void testAuthFail() throws Exception { } - client.sendRequest(new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, - 1L, 1L, (short) 0, null)); + ReadRequest read = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, + 1L, 1L, (short) 0, null); + client.sendRequest(read); Response response = client.takeResponse(); assertEquals("Should have failed", response.getErrorCode(), BookieProtocol.EUA);