diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index 720e7279f5a..a11fad102dd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -355,7 +355,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) super.write(ctx, msg, promise); super.flush(ctx); } else { - waitingForAuth.add(msg); + addMsgAndPromiseToQueue(msg, promise); } } else if (msg instanceof BookieProtocol.Request) { // let auth messages through, queue the rest @@ -364,16 +364,26 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) super.write(ctx, msg, promise); super.flush(ctx); } else { - waitingForAuth.add(msg); + addMsgAndPromiseToQueue(msg, promise); } } else if (msg instanceof ByteBuf || msg instanceof ByteBufList) { - waitingForAuth.add(msg); + addMsgAndPromiseToQueue(msg, promise); } else { LOG.info("[{}] dropping write of message {}", ctx.channel(), msg); } } } + // Add the message and the associated promise to the queue. + // The promise is added to the same queue as the message without an additional wrapper object so + // that object allocations can be avoided. A similar solution is used in Netty codebase. + private void addMsgAndPromiseToQueue(Object msg, ChannelPromise promise) { + waitingForAuth.add(msg); + if (promise != null && !promise.isVoid()) { + waitingForAuth.add(promise); + } + } + long newTxnId() { return transactionIdGenerator.incrementAndGet(); } @@ -433,10 +443,19 @@ public void operationComplete(int rc, Void v) { if (rc == BKException.Code.OK) { synchronized (this) { authenticated = true; - Object msg = waitingForAuth.poll(); - while (msg != null) { - NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, msg); - msg = waitingForAuth.poll(); + while (true) { + Object msg = waitingForAuth.poll(); + if (msg == null) { + break; + } + ChannelPromise promise; + // check if the message has an associated promise as the next element in the queue + if (waitingForAuth.peek() instanceof ChannelPromise) { + promise = (ChannelPromise) waitingForAuth.poll(); + } else { + promise = ctx.voidPromise(); + } + ctx.writeAndFlush(msg, promise); } } } else { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java index c305a51ea42..5dde52bc3f5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java @@ -260,18 +260,20 @@ public void writeLac(final BookieId addr, final long ledgerId, final byte[] mast toSend.retain(); client.obtain((rc, pcbc) -> { - if (rc != BKException.Code.OK) { - try { - executor.executeOrdered(ledgerId, - () -> cb.writeLacComplete(rc, ledgerId, addr, ctx)); - } catch (RejectedExecutionException re) { - cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx); + try { + if (rc != BKException.Code.OK) { + try { + executor.executeOrdered(ledgerId, + () -> cb.writeLacComplete(rc, ledgerId, addr, ctx)); + } catch (RejectedExecutionException re) { + cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx); + } + } else { + pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx); } - } else { - pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx); + } finally { + ReferenceCountUtil.release(toSend); } - - ReferenceCountUtil.release(toSend); }, ledgerId, useV3Enforced); } @@ -392,14 +394,16 @@ static ChannelReadyForAddEntryCallback create( @Override public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - if (rc != BKException.Code.OK) { - bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx); - } else { - pcbc.addEntry(ledgerId, masterKey, entryId, - toSend, cb, ctx, options, allowFastFail, writeFlags); + try { + if (rc != BKException.Code.OK) { + bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx); + } else { + pcbc.addEntry(ledgerId, masterKey, entryId, + toSend, cb, ctx, options, allowFastFail, writeFlags); + } + } finally { + ReferenceCountUtil.release(toSend); } - - ReferenceCountUtil.release(toSend); recycle(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java new file mode 100644 index 00000000000..b26ac7b36ab --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ByteStringUtil.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.proto; + +import com.google.protobuf.ByteString; +import com.google.protobuf.UnsafeByteOperations; +import io.netty.buffer.ByteBuf; +import java.nio.ByteBuffer; +import org.apache.bookkeeper.util.ByteBufList; + +public class ByteStringUtil { + + /** + * Wrap the internal buffers of a ByteBufList into a single ByteString. + * The lifecycle of the wrapped ByteString is tied to the ByteBufList. + * + * @param bufList ByteBufList to wrap + * @return ByteString wrapping the internal buffers of the ByteBufList + */ + public static ByteString byteBufListToByteString(ByteBufList bufList) { + ByteString aggregated = null; + for (int i = 0; i < bufList.size(); i++) { + ByteBuf buffer = bufList.getBuffer(i); + if (buffer.readableBytes() > 0) { + aggregated = byteBufToByteString(aggregated, buffer); + } + } + return aggregated != null ? aggregated : ByteString.EMPTY; + } + + /** + * Wrap the internal buffers of a ByteBuf into a single ByteString. + * The lifecycle of the wrapped ByteString is tied to the ByteBuf. + * + * @param byteBuf ByteBuf to wrap + * @return ByteString wrapping the internal buffers of the ByteBuf + */ + public static ByteString byteBufToByteString(ByteBuf byteBuf) { + return byteBufToByteString(null, byteBuf); + } + + // internal method to aggregate a ByteBuf into a single aggregated ByteString + private static ByteString byteBufToByteString(ByteString aggregated, ByteBuf byteBuf) { + if (byteBuf.readableBytes() == 0) { + return ByteString.EMPTY; + } + if (byteBuf.nioBufferCount() > 1) { + for (ByteBuffer nioBuffer : byteBuf.nioBuffers()) { + ByteString piece = UnsafeByteOperations.unsafeWrap(nioBuffer); + aggregated = (aggregated == null) ? piece : aggregated.concat(piece); + } + } else { + ByteString piece; + if (byteBuf.hasArray()) { + piece = UnsafeByteOperations.unsafeWrap(byteBuf.array(), byteBuf.arrayOffset() + byteBuf.readerIndex(), + byteBuf.readableBytes()); + } else { + piece = UnsafeByteOperations.unsafeWrap(byteBuf.nioBuffer()); + } + aggregated = (aggregated == null) ? piece : aggregated.concat(piece); + } + return aggregated; + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 3a7af5b99a6..ca1448c7682 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -699,14 +699,10 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB .setVersion(ProtocolVersion.VERSION_THREE) .setOperation(OperationType.WRITE_LAC) .setTxnId(txnId); - ByteString body; - if (toSend.hasArray()) { - body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes()); - } else if (toSend.size() == 1) { - body = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer()); - } else { - body = UnsafeByteOperations.unsafeWrap(toSend.toArray()); - } + ByteString body = ByteStringUtil.byteBufListToByteString(toSend); + toSend.retain(); + Runnable cleanupActionFailedBeforeWrite = toSend::release; + Runnable cleanupActionAfterWrite = cleanupActionFailedBeforeWrite; WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder() .setLedgerId(ledgerId) .setLac(lac) @@ -717,7 +713,8 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB .setHeader(headerBuilder) .setWriteLacRequest(writeLacBuilder) .build(); - writeAndFlush(channel, completionKey, writeLacRequest); + writeAndFlush(channel, completionKey, writeLacRequest, false, cleanupActionFailedBeforeWrite, + cleanupActionAfterWrite); } void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) { @@ -776,6 +773,8 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen Object ctx, final int options, boolean allowFastFail, final EnumSet writeFlags) { Object request = null; CompletionKey completionKey = null; + Runnable cleanupActionFailedBeforeWrite = null; + Runnable cleanupActionAfterWrite = null; if (useV2WireProtocol) { if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) { LOG.error("invalid writeflags {} for v2 protocol", writeFlags); @@ -785,9 +784,14 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY); if (toSend instanceof ByteBuf) { - request = ((ByteBuf) toSend).retainedDuplicate(); + ByteBuf byteBuf = ((ByteBuf) toSend).retainedDuplicate(); + request = byteBuf; + cleanupActionFailedBeforeWrite = byteBuf::release; } else { - request = ByteBufList.clone((ByteBufList) toSend); + ByteBufList byteBufList = (ByteBufList) toSend; + byteBufList.retain(); + request = byteBufList; + cleanupActionFailedBeforeWrite = byteBufList::release; } } else { final long txnId = getTxnId(); @@ -802,19 +806,11 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE); } - ByteString body = null; ByteBufList bufToSend = (ByteBufList) toSend; - - if (bufToSend.hasArray()) { - body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), bufToSend.arrayOffset(), - bufToSend.readableBytes()); - } else { - for (int i = 0; i < bufToSend.size(); i++) { - ByteString piece = UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer()); - // use ByteString.concat to avoid byte[] allocation when toSend has multiple ByteBufs - body = (body == null) ? piece : body.concat(piece); - } - } + ByteString body = ByteStringUtil.byteBufListToByteString(bufToSend); + bufToSend.retain(); + cleanupActionFailedBeforeWrite = bufToSend::release; + cleanupActionAfterWrite = cleanupActionFailedBeforeWrite; AddRequest.Builder addBuilder = AddRequest.newBuilder() .setLedgerId(ledgerId) .setEntryId(entryId) @@ -839,17 +835,9 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen putCompletionKeyValue(completionKey, acquireAddCompletion(completionKey, cb, ctx, ledgerId, entryId)); - final Channel c = channel; - if (c == null) { - // Manually release the binary data(variable "request") that we manually created when it can not be sent out - // because the channel is switching. - errorOut(completionKey); - ReferenceCountUtil.release(request); - return; - } else { - // addEntry times out on backpressure - writeAndFlush(c, completionKey, request, allowFastFail); - } + // addEntry times out on backpressure + writeAndFlush(channel, completionKey, request, allowFastFail, cleanupActionFailedBeforeWrite, + cleanupActionAfterWrite); } public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) { @@ -1004,7 +992,7 @@ private void readEntryInternal(final long ledgerId, ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId); putCompletionKeyValue(completionKey, readCompletion); - writeAndFlush(channel, completionKey, request, allowFastFail); + writeAndFlush(channel, completionKey, request, allowFastFail, null, null); } public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) { @@ -1126,17 +1114,20 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio private void writeAndFlush(final Channel channel, final CompletionKey key, final Object request) { - writeAndFlush(channel, key, request, false); + writeAndFlush(channel, key, request, false, null, null); } private void writeAndFlush(final Channel channel, final CompletionKey key, final Object request, - final boolean allowFastFail) { + final boolean allowFastFail, final Runnable cleanupActionFailedBeforeWrite, + final Runnable cleanupActionAfterWrite) { if (channel == null) { LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request)); errorOut(key); - ReferenceCountUtil.release(request); + if (cleanupActionFailedBeforeWrite != null) { + cleanupActionFailedBeforeWrite.run(); + } return; } @@ -1151,7 +1142,9 @@ private void writeAndFlush(final Channel channel, StringUtils.requestToString(request)); errorOut(key, BKException.Code.TooManyRequestsException); - ReferenceCountUtil.release(request); + if (cleanupActionFailedBeforeWrite != null) { + cleanupActionFailedBeforeWrite.run(); + } return; } @@ -1159,23 +1152,29 @@ private void writeAndFlush(final Channel channel, final long startTime = MathUtils.nowInNano(); ChannelPromise promise = channel.newPromise().addListener(future -> { - if (future.isSuccess()) { - nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); - CompletionValue completion = completionObjects.get(key); - if (completion != null) { - completion.setOutstanding(); + try { + if (future.isSuccess()) { + nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); + CompletionValue completion = completionObjects.get(key); + if (completion != null) { + completion.setOutstanding(); + } + } else { + nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); + } + } finally { + if (cleanupActionAfterWrite != null) { + cleanupActionAfterWrite.run(); } - } else { - nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } }); channel.writeAndFlush(request, promise); } catch (Throwable e) { LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); errorOut(key); - // If the request goes into the writeAndFlush, it should be handled well by Netty. So all the exceptions we - // get here, we can release the request. - ReferenceCountUtil.release(request); + if (cleanupActionFailedBeforeWrite != null) { + cleanupActionFailedBeforeWrite.run(); + } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java index f690b99197b..1f85a16db8b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java @@ -286,19 +286,29 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (msg instanceof ByteBufList) { ByteBufList b = (ByteBufList) msg; - try { - // Write each buffer individually on the socket. The retain() here is needed to preserve the fact - // that ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased - // and it gets written multiple times, the individual buffers refcount should be reflected as well. - int buffersCount = b.buffers.size(); - for (int i = 0; i < buffersCount; i++) { - ByteBuf bx = b.buffers.get(i); - // Last buffer will carry on the final promise to notify when everything was written on the - // socket - ctx.write(bx.retainedDuplicate(), i == (buffersCount - 1) ? promise : ctx.voidPromise()); + ChannelPromise compositePromise = ctx.newPromise(); + compositePromise.addListener(future -> { + // release the ByteBufList after the write operation is completed + ReferenceCountUtil.safeRelease(b); + // complete the promise passed as an argument unless it's a void promise + if (promise != null && !promise.isVoid()) { + if (future.isSuccess()) { + promise.setSuccess(); + } else { + promise.setFailure(future.cause()); + } } - } finally { - ReferenceCountUtil.release(b); + }); + + // Write each buffer individually on the socket. The retain() here is needed to preserve the fact + // that ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased + // and it gets written multiple times, the individual buffers refcount should be reflected as well. + int buffersCount = b.buffers.size(); + for (int i = 0; i < buffersCount; i++) { + ByteBuf bx = b.buffers.get(i); + // Last buffer will carry on the final promise to notify when everything was written on the + // socket + ctx.write(bx.retainedDuplicate(), i == (buffersCount - 1) ? compositePromise : ctx.voidPromise()); } } else { ctx.write(msg, promise); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java index 88de17d0a9d..822d4a7c48e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -32,6 +34,8 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelProgressivePromise; import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.channel.VoidChannelPromise; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; @@ -195,7 +199,8 @@ public void testEncoder() throws Exception { b2.writerIndex(b2.capacity()); ByteBufList buf = ByteBufList.get(b1, b2); - ChannelHandlerContext ctx = new MockChannelHandlerContext(); + Channel channel = mock(Channel.class); + MockChannelHandlerContext ctx = new MockChannelHandlerContext(channel); ByteBufList.ENCODER.write(ctx, buf, null); @@ -205,6 +210,15 @@ public void testEncoder() throws Exception { } class MockChannelHandlerContext implements ChannelHandlerContext { + private final Channel channel; + private final EventExecutor eventExecutor; + + public MockChannelHandlerContext(Channel channel) { + this.channel = channel; + eventExecutor = mock(EventExecutor.class); + when(eventExecutor.inEventLoop()).thenReturn(true); + } + @Override public ChannelFuture bind(SocketAddress localAddress) { return null; @@ -274,12 +288,18 @@ public ChannelFuture write(Object msg) { @Override public ChannelFuture write(Object msg, ChannelPromise promise) { ReferenceCountUtil.release(msg); + if (promise != null && !promise.isVoid()) { + promise.setSuccess(); + } return null; } @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { ReferenceCountUtil.release(msg); + if (promise != null && !promise.isVoid()) { + promise.setSuccess(); + } return null; } @@ -291,7 +311,7 @@ public ChannelFuture writeAndFlush(Object msg) { @Override public ChannelPromise newPromise() { - return null; + return new DefaultChannelPromise(channel, eventExecutor); } @Override @@ -311,17 +331,17 @@ public ChannelFuture newFailedFuture(Throwable cause) { @Override public ChannelPromise voidPromise() { - return null; + return new VoidChannelPromise(channel, false); } @Override public Channel channel() { - return null; + return channel; } @Override public EventExecutor executor() { - return null; + return eventExecutor; } @Override