From b62ba8676f34efebf033650f69de046e1a53db2a Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 16 Apr 2024 13:27:57 +0300 Subject: [PATCH 1/8] fix: reference counts in PerChannelBookieClient --- .../bookkeeper/proto/BookieClientImpl.java | 38 +++--- .../proto/PerChannelBookieClient.java | 127 +++++++++++------- 2 files changed, 99 insertions(+), 66 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java index 8d7742e7848..a12d9fd64d5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java @@ -261,18 +261,20 @@ public void writeLac(final BookieId addr, final long ledgerId, final byte[] mast toSend.retain(); client.obtain((rc, pcbc) -> { - if (rc != BKException.Code.OK) { - try { - executor.executeOrdered(ledgerId, - () -> cb.writeLacComplete(rc, ledgerId, addr, ctx)); - } catch (RejectedExecutionException re) { - cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx); + try { + if (rc != BKException.Code.OK) { + try { + executor.executeOrdered(ledgerId, + () -> cb.writeLacComplete(rc, ledgerId, addr, ctx)); + } catch (RejectedExecutionException re) { + cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx); + } + } else { + pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx); } - } else { - pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx); + } finally { + ReferenceCountUtil.release(toSend); } - - ReferenceCountUtil.release(toSend); }, ledgerId, useV3Enforced); } @@ -407,14 +409,16 @@ static ChannelReadyForAddEntryCallback create( @Override public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - if (rc != BKException.Code.OK) { - bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx); - } else { - pcbc.addEntry(ledgerId, masterKey, entryId, - toSend, cb, ctx, options, allowFastFail, writeFlags); + try { + if (rc != BKException.Code.OK) { + bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx); + } else { + pcbc.addEntry(ledgerId, masterKey, entryId, + toSend, cb, ctx, options, allowFastFail, writeFlags); + } + } finally { + ReferenceCountUtil.release(toSend); } - - ReferenceCountUtil.release(toSend); recycle(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 8485d21b74f..79c4f357cd1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -69,6 +69,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import java.security.cert.Certificate; import java.util.ArrayDeque; import java.util.ArrayList; @@ -700,14 +701,10 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB .setVersion(ProtocolVersion.VERSION_THREE) .setOperation(OperationType.WRITE_LAC) .setTxnId(txnId); - ByteString body; - if (toSend.hasArray()) { - body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes()); - } else if (toSend.size() == 1) { - body = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer()); - } else { - body = UnsafeByteOperations.unsafeWrap(toSend.toArray()); - } + ByteString body = byteBufListToByteString(toSend); + toSend.retain(); + Runnable cleanupActionFailedBeforeWrite = toSend::release; + Runnable cleanupActionAfterWrite = cleanupActionFailedBeforeWrite; WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder() .setLedgerId(ledgerId) .setLac(lac) @@ -718,7 +715,8 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB .setHeader(headerBuilder) .setWriteLacRequest(writeLacBuilder) .build(); - writeAndFlush(channel, completionKey, writeLacRequest); + writeAndFlush(channel, completionKey, writeLacRequest, false, cleanupActionFailedBeforeWrite, + cleanupActionAfterWrite); } void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) { @@ -777,6 +775,8 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen Object ctx, final int options, boolean allowFastFail, final EnumSet writeFlags) { Object request = null; CompletionKey completionKey = null; + Runnable cleanupActionFailedBeforeWrite = null; + Runnable cleanupActionAfterWrite = null; if (useV2WireProtocol) { if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) { LOG.error("invalid writeflags {} for v2 protocol", writeFlags); @@ -786,9 +786,13 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY); if (toSend instanceof ByteBuf) { - request = ((ByteBuf) toSend).retainedDuplicate(); + ByteBuf byteBuf = ((ByteBuf) toSend).retainedDuplicate(); + request = byteBuf; + cleanupActionFailedBeforeWrite = byteBuf::release; } else { - request = ByteBufList.clone((ByteBufList) toSend); + ByteBufList byteBufList = ByteBufList.clone((ByteBufList) toSend); + request = byteBufList; + cleanupActionFailedBeforeWrite = byteBufList::release; } } else { final long txnId = getTxnId(); @@ -803,19 +807,11 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE); } - ByteString body = null; ByteBufList bufToSend = (ByteBufList) toSend; - - if (bufToSend.hasArray()) { - body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), bufToSend.arrayOffset(), - bufToSend.readableBytes()); - } else { - for (int i = 0; i < bufToSend.size(); i++) { - ByteString piece = UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer()); - // use ByteString.concat to avoid byte[] allocation when toSend has multiple ByteBufs - body = (body == null) ? piece : body.concat(piece); - } - } + ByteString body = byteBufListToByteString(bufToSend); + bufToSend.retain(); + cleanupActionFailedBeforeWrite = bufToSend::release; + cleanupActionAfterWrite = cleanupActionFailedBeforeWrite; AddRequest.Builder addBuilder = AddRequest.newBuilder() .setLedgerId(ledgerId) .setEntryId(entryId) @@ -840,17 +836,39 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen putCompletionKeyValue(completionKey, acquireAddCompletion(completionKey, cb, ctx, ledgerId, entryId)); - final Channel c = channel; - if (c == null) { - // Manually release the binary data(variable "request") that we manually created when it can not be sent out - // because the channel is switching. - errorOut(completionKey); - ReferenceCountUtil.release(request); - return; - } else { - // addEntry times out on backpressure - writeAndFlush(c, completionKey, request, allowFastFail); + // addEntry times out on backpressure + writeAndFlush(channel, completionKey, request, allowFastFail, cleanupActionFailedBeforeWrite, + cleanupActionAfterWrite); + } + + /** + * Wrap the internal buffers of a ByteBufList into a single ByteString. + * The lifecycle of the wrapped ByteString is tied to the ByteBufList. + * + * @param bufList ByteBufList to wrap + * @return ByteString wrapping the internal buffers of the ByteBufList + */ + private static ByteString byteBufListToByteString(ByteBufList bufList) { + ByteString aggregated = null; + for (int i = 0; i < bufList.size(); i++) { + ByteBuf buffer = bufList.getBuffer(i); + ByteString piece; + if (buffer.nioBufferCount() > 1) { + for (ByteBuffer nioBuffer : buffer.nioBuffers()) { + piece = UnsafeByteOperations.unsafeWrap(nioBuffer); + aggregated = (aggregated == null) ? piece : aggregated.concat(piece); + } + } else { + if (buffer.hasArray()) { + piece = UnsafeByteOperations.unsafeWrap(buffer.array(), buffer.arrayOffset(), + buffer.readableBytes()); + } else { + piece = UnsafeByteOperations.unsafeWrap(buffer.nioBuffer()); + } + aggregated = (aggregated == null) ? piece : aggregated.concat(piece); + } } + return aggregated != null ? aggregated : ByteString.EMPTY; } public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) { @@ -1005,7 +1023,7 @@ private void readEntryInternal(final long ledgerId, ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId); putCompletionKeyValue(completionKey, readCompletion); - writeAndFlush(channel, completionKey, request, allowFastFail); + writeAndFlush(channel, completionKey, request, allowFastFail, null, null); } public void batchReadEntries(final long ledgerId, @@ -1048,7 +1066,7 @@ private void batchReadEntriesInternal(final long ledgerId, completionKey, cb, ctx, ledgerId, startEntryId); putCompletionKeyValue(completionKey, readCompletion); - writeAndFlush(channel, completionKey, request, allowFastFail); + writeAndFlush(channel, completionKey, request, allowFastFail, null, null); } public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) { @@ -1170,17 +1188,20 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio private void writeAndFlush(final Channel channel, final CompletionKey key, final Object request) { - writeAndFlush(channel, key, request, false); + writeAndFlush(channel, key, request, false, null, null); } private void writeAndFlush(final Channel channel, final CompletionKey key, final Object request, - final boolean allowFastFail) { + final boolean allowFastFail, final Runnable cleanupActionFailedBeforeWrite, + final Runnable cleanupActionAfterWrite) { if (channel == null) { LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request)); errorOut(key); - ReferenceCountUtil.release(request); + if (cleanupActionFailedBeforeWrite != null) { + cleanupActionFailedBeforeWrite.run(); + } return; } @@ -1195,7 +1216,9 @@ private void writeAndFlush(final Channel channel, StringUtils.requestToString(request)); errorOut(key, BKException.Code.TooManyRequestsException); - ReferenceCountUtil.release(request); + if (cleanupActionFailedBeforeWrite != null) { + cleanupActionFailedBeforeWrite.run(); + } return; } @@ -1203,23 +1226,29 @@ private void writeAndFlush(final Channel channel, final long startTime = MathUtils.nowInNano(); ChannelPromise promise = channel.newPromise().addListener(future -> { - if (future.isSuccess()) { - nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); - CompletionValue completion = completionObjects.get(key); - if (completion != null) { - completion.setOutstanding(); + try { + if (future.isSuccess()) { + nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); + CompletionValue completion = completionObjects.get(key); + if (completion != null) { + completion.setOutstanding(); + } + } else { + nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); + } + } finally { + if (cleanupActionAfterWrite != null) { + cleanupActionAfterWrite.run(); } - } else { - nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } }); channel.writeAndFlush(request, promise); } catch (Throwable e) { LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); errorOut(key); - // If the request goes into the writeAndFlush, it should be handled well by Netty. So all the exceptions we - // get here, we can release the request. - ReferenceCountUtil.release(request); + if (cleanupActionFailedBeforeWrite != null) { + cleanupActionFailedBeforeWrite.run(); + } } } From 230271cb9303a7d02bb7ce2e3048bf89fc83dd69 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 22 Apr 2024 17:41:26 +0300 Subject: [PATCH 2/8] Extract helper class --- .../bookkeeper/proto/ByteStringUtil.java | 74 +++++++++++++++++++ .../proto/PerChannelBookieClient.java | 35 +-------- 2 files changed, 76 insertions(+), 33 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java new file mode 100644 index 00000000000..4ff202922b4 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.proto; + +import com.google.protobuf.ByteString; +import com.google.protobuf.UnsafeByteOperations; +import io.netty.buffer.ByteBuf; +import java.nio.ByteBuffer; +import org.apache.bookkeeper.util.ByteBufList; + +public class ByteStringUtil { + + /** + * Wrap the internal buffers of a ByteBufList into a single ByteString. + * The lifecycle of the wrapped ByteString is tied to the ByteBufList. + * + * @param bufList ByteBufList to wrap + * @return ByteString wrapping the internal buffers of the ByteBufList + */ + public static ByteString byteBufListToByteString(ByteBufList bufList) { + ByteString aggregated = null; + for (int i = 0; i < bufList.size(); i++) { + aggregated = byteBufToByteString(aggregated, bufList.getBuffer(i)); + } + return aggregated != null ? aggregated : ByteString.EMPTY; + } + + /** + * Wrap the internal buffers of a ByteBuf into a single ByteString. + * The lifecycle of the wrapped ByteString is tied to the ByteBuf. + * + * @param byteBuf ByteBuf to wrap + * @return ByteString wrapping the internal buffers of the ByteBuf + */ + public static ByteString byteBufToByteString(ByteBuf byteBuf) { + return byteBufToByteString(null, byteBuf); + } + + // internal method to aggregate a ByteBuf into a single aggregated ByteString + private static ByteString byteBufToByteString(ByteString aggregated, ByteBuf byteBuf) { + if (byteBuf.nioBufferCount() > 1) { + for (ByteBuffer nioBuffer : byteBuf.nioBuffers()) { + ByteString piece = UnsafeByteOperations.unsafeWrap(nioBuffer); + aggregated = (aggregated == null) ? piece : aggregated.concat(piece); + } + } else { + ByteString piece; + if (byteBuf.hasArray()) { + piece = UnsafeByteOperations.unsafeWrap(byteBuf.array(), byteBuf.arrayOffset(), + byteBuf.readableBytes()); + } else { + piece = UnsafeByteOperations.unsafeWrap(byteBuf.nioBuffer()); + } + aggregated = (aggregated == null) ? piece : aggregated.concat(piece); + } + return aggregated; + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 79c4f357cd1..b4e271b2ea6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -69,7 +69,6 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; -import java.nio.ByteBuffer; import java.security.cert.Certificate; import java.util.ArrayDeque; import java.util.ArrayList; @@ -701,7 +700,7 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB .setVersion(ProtocolVersion.VERSION_THREE) .setOperation(OperationType.WRITE_LAC) .setTxnId(txnId); - ByteString body = byteBufListToByteString(toSend); + ByteString body = ByteStringUtil.byteBufListToByteString(toSend); toSend.retain(); Runnable cleanupActionFailedBeforeWrite = toSend::release; Runnable cleanupActionAfterWrite = cleanupActionFailedBeforeWrite; @@ -808,7 +807,7 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen } ByteBufList bufToSend = (ByteBufList) toSend; - ByteString body = byteBufListToByteString(bufToSend); + ByteString body = ByteStringUtil.byteBufListToByteString(bufToSend); bufToSend.retain(); cleanupActionFailedBeforeWrite = bufToSend::release; cleanupActionAfterWrite = cleanupActionFailedBeforeWrite; @@ -841,36 +840,6 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen cleanupActionAfterWrite); } - /** - * Wrap the internal buffers of a ByteBufList into a single ByteString. - * The lifecycle of the wrapped ByteString is tied to the ByteBufList. - * - * @param bufList ByteBufList to wrap - * @return ByteString wrapping the internal buffers of the ByteBufList - */ - private static ByteString byteBufListToByteString(ByteBufList bufList) { - ByteString aggregated = null; - for (int i = 0; i < bufList.size(); i++) { - ByteBuf buffer = bufList.getBuffer(i); - ByteString piece; - if (buffer.nioBufferCount() > 1) { - for (ByteBuffer nioBuffer : buffer.nioBuffers()) { - piece = UnsafeByteOperations.unsafeWrap(nioBuffer); - aggregated = (aggregated == null) ? piece : aggregated.concat(piece); - } - } else { - if (buffer.hasArray()) { - piece = UnsafeByteOperations.unsafeWrap(buffer.array(), buffer.arrayOffset(), - buffer.readableBytes()); - } else { - piece = UnsafeByteOperations.unsafeWrap(buffer.nioBuffer()); - } - aggregated = (aggregated == null) ? piece : aggregated.concat(piece); - } - } - return aggregated != null ? aggregated : ByteString.EMPTY; - } - public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) { Object request = null; CompletionKey completionKey = null; From 59dd9cdb8bac3a91d12635425e37dc0d0136c15e Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 18:03:41 +0300 Subject: [PATCH 3/8] Add solution to ByteBufList.Encoder to release after the write completes --- .../apache/bookkeeper/util/ByteBufList.java | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java index b363e4da636..e1837b5ce66 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java @@ -286,19 +286,29 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (msg instanceof ByteBufList) { ByteBufList b = (ByteBufList) msg; - try { - // Write each buffer individually on the socket. The retain() here is needed to preserve the fact - // that ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased - // and it gets written multiple times, the individual buffers refcount should be reflected as well. - int buffersCount = b.buffers.size(); - for (int i = 0; i < buffersCount; i++) { - ByteBuf bx = b.buffers.get(i); - // Last buffer will carry on the final promise to notify when everything was written on the - // socket - ctx.write(bx.retainedDuplicate(), i == (buffersCount - 1) ? promise : ctx.voidPromise()); + ChannelPromise compositePromise = ctx.newPromise(); + compositePromise.addListener(future -> { + // release the ByteBufList after the write operation is completed + ReferenceCountUtil.safeRelease(b); + // complete the promise passed as an argument unless it's a void promise + if (!promise.isVoid()) { + if (future.isSuccess()) { + promise.setSuccess(); + } else { + promise.setFailure(future.cause()); + } } - } finally { - ReferenceCountUtil.release(b); + }); + + // Write each buffer individually on the socket. The retain() here is needed to preserve the fact + // that ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased + // and it gets written multiple times, the individual buffers refcount should be reflected as well. + int buffersCount = b.buffers.size(); + for (int i = 0; i < buffersCount; i++) { + ByteBuf bx = b.buffers.get(i); + // Last buffer will carry on the final promise to notify when everything was written on the + // socket + ctx.write(bx.retainedDuplicate(), i == (buffersCount - 1) ? compositePromise : ctx.voidPromise()); } } else { ctx.write(msg, promise); From d05e976376c6f8d279047379abdc08bbe23f8160 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 18:13:17 +0300 Subject: [PATCH 4/8] Remove unnecessary clone of ByteBufList --- .../org/apache/bookkeeper/proto/PerChannelBookieClient.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index b4e271b2ea6..0e5335ca423 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -789,7 +789,8 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen request = byteBuf; cleanupActionFailedBeforeWrite = byteBuf::release; } else { - ByteBufList byteBufList = ByteBufList.clone((ByteBufList) toSend); + ByteBufList byteBufList = (ByteBufList) toSend; + byteBufList.retain(); request = byteBufList; cleanupActionFailedBeforeWrite = byteBufList::release; } From fd5431a585e1dfb2eb67fc0b99d3329b41691678 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 19:35:27 +0300 Subject: [PATCH 5/8] Fix bug in ByteStringUtil --- .../main/java/org/apache/bookkeeper/proto/ByteStringUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java index 4ff202922b4..39f31dc2cbf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java @@ -62,7 +62,7 @@ private static ByteString byteBufToByteString(ByteString aggregated, ByteBuf byt } else { ByteString piece; if (byteBuf.hasArray()) { - piece = UnsafeByteOperations.unsafeWrap(byteBuf.array(), byteBuf.arrayOffset(), + piece = UnsafeByteOperations.unsafeWrap(byteBuf.array(), byteBuf.arrayOffset() + byteBuf.readerIndex(), byteBuf.readableBytes()); } else { piece = UnsafeByteOperations.unsafeWrap(byteBuf.nioBuffer()); From 6dfe88dfe9efba9dd843f6734723d10a31ecd2cd Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 19:38:09 +0300 Subject: [PATCH 6/8] Add ByteStringUtil optimization --- .../java/org/apache/bookkeeper/proto/ByteStringUtil.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java index 39f31dc2cbf..b26ac7b36ab 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java @@ -36,7 +36,10 @@ public class ByteStringUtil { public static ByteString byteBufListToByteString(ByteBufList bufList) { ByteString aggregated = null; for (int i = 0; i < bufList.size(); i++) { - aggregated = byteBufToByteString(aggregated, bufList.getBuffer(i)); + ByteBuf buffer = bufList.getBuffer(i); + if (buffer.readableBytes() > 0) { + aggregated = byteBufToByteString(aggregated, buffer); + } } return aggregated != null ? aggregated : ByteString.EMPTY; } @@ -54,6 +57,9 @@ public static ByteString byteBufToByteString(ByteBuf byteBuf) { // internal method to aggregate a ByteBuf into a single aggregated ByteString private static ByteString byteBufToByteString(ByteString aggregated, ByteBuf byteBuf) { + if (byteBuf.readableBytes() == 0) { + return ByteString.EMPTY; + } if (byteBuf.nioBufferCount() > 1) { for (ByteBuffer nioBuffer : byteBuf.nioBuffers()) { ByteString piece = UnsafeByteOperations.unsafeWrap(nioBuffer); From 74f97826ef268d7da6c630205f892b3e6db4ce45 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 21:49:20 +0300 Subject: [PATCH 7/8] Add handling for channel promises to AuthHandler --- .../apache/bookkeeper/proto/AuthHandler.java | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index 720e7279f5a..a11fad102dd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -355,7 +355,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) super.write(ctx, msg, promise); super.flush(ctx); } else { - waitingForAuth.add(msg); + addMsgAndPromiseToQueue(msg, promise); } } else if (msg instanceof BookieProtocol.Request) { // let auth messages through, queue the rest @@ -364,16 +364,26 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) super.write(ctx, msg, promise); super.flush(ctx); } else { - waitingForAuth.add(msg); + addMsgAndPromiseToQueue(msg, promise); } } else if (msg instanceof ByteBuf || msg instanceof ByteBufList) { - waitingForAuth.add(msg); + addMsgAndPromiseToQueue(msg, promise); } else { LOG.info("[{}] dropping write of message {}", ctx.channel(), msg); } } } + // Add the message and the associated promise to the queue. + // The promise is added to the same queue as the message without an additional wrapper object so + // that object allocations can be avoided. A similar solution is used in Netty codebase. + private void addMsgAndPromiseToQueue(Object msg, ChannelPromise promise) { + waitingForAuth.add(msg); + if (promise != null && !promise.isVoid()) { + waitingForAuth.add(promise); + } + } + long newTxnId() { return transactionIdGenerator.incrementAndGet(); } @@ -433,10 +443,19 @@ public void operationComplete(int rc, Void v) { if (rc == BKException.Code.OK) { synchronized (this) { authenticated = true; - Object msg = waitingForAuth.poll(); - while (msg != null) { - NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, msg); - msg = waitingForAuth.poll(); + while (true) { + Object msg = waitingForAuth.poll(); + if (msg == null) { + break; + } + ChannelPromise promise; + // check if the message has an associated promise as the next element in the queue + if (waitingForAuth.peek() instanceof ChannelPromise) { + promise = (ChannelPromise) waitingForAuth.poll(); + } else { + promise = ctx.voidPromise(); + } + ctx.writeAndFlush(msg, promise); } } } else { From b6127bfa7f027d51c3fb601770da63224a2ad63d Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 23 May 2024 23:13:21 +0300 Subject: [PATCH 8/8] Fix ByteBufListTest --- .../apache/bookkeeper/util/ByteBufList.java | 2 +- .../bookkeeper/util/ByteBufListTest.java | 30 +++++++++++++++---- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java index e1837b5ce66..133a37f0379 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java @@ -291,7 +291,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) // release the ByteBufList after the write operation is completed ReferenceCountUtil.safeRelease(b); // complete the promise passed as an argument unless it's a void promise - if (!promise.isVoid()) { + if (promise != null && !promise.isVoid()) { if (future.isSuccess()) { promise.setSuccess(); } else { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java index 88de17d0a9d..822d4a7c48e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -32,6 +34,8 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelProgressivePromise; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.channel.VoidChannelPromise; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; @@ -195,7 +199,8 @@ public void testEncoder() throws Exception { b2.writerIndex(b2.capacity()); ByteBufList buf = ByteBufList.get(b1, b2); - ChannelHandlerContext ctx = new MockChannelHandlerContext(); + Channel channel = mock(Channel.class); + MockChannelHandlerContext ctx = new MockChannelHandlerContext(channel); ByteBufList.ENCODER.write(ctx, buf, null); @@ -205,6 +210,15 @@ public void testEncoder() throws Exception { } class MockChannelHandlerContext implements ChannelHandlerContext { + private final Channel channel; + private final EventExecutor eventExecutor; + + public MockChannelHandlerContext(Channel channel) { + this.channel = channel; + eventExecutor = mock(EventExecutor.class); + when(eventExecutor.inEventLoop()).thenReturn(true); + } + @Override public ChannelFuture bind(SocketAddress localAddress) { return null; @@ -274,12 +288,18 @@ public ChannelFuture write(Object msg) { @Override public ChannelFuture write(Object msg, ChannelPromise promise) { ReferenceCountUtil.release(msg); + if (promise != null && !promise.isVoid()) { + promise.setSuccess(); + } return null; } @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { ReferenceCountUtil.release(msg); + if (promise != null && !promise.isVoid()) { + promise.setSuccess(); + } return null; } @@ -291,7 +311,7 @@ public ChannelFuture writeAndFlush(Object msg) { @Override public ChannelPromise newPromise() { - return null; + return new DefaultChannelPromise(channel, eventExecutor); } @Override @@ -311,17 +331,17 @@ public ChannelFuture newFailedFuture(Throwable cause) { @Override public ChannelPromise voidPromise() { - return null; + return new VoidChannelPromise(channel, false); } @Override public Channel channel() { - return null; + return channel; } @Override public EventExecutor executor() { - return null; + return eventExecutor; } @Override