From 7fbb5474f8abfd910e1efff548f1952ddd85502b Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Thu, 9 Jun 2022 18:15:28 +0800 Subject: [PATCH 1/5] Apply the backpressure changes on the V2 requests --- *Motivation* If one bookie is slow (not down, just slow), the BK client will the acks to the user that the entries are written after the first 2 acks. In the meantime, it will keep waiting for the 3rd bookie to respond. If the bookie responds within the timeout, the entries can now be dropped from memory, otherwise the write will timeout internally and it will get replayed to a new bookie. In the V3 request, we have [server-side backpressure](https://github.com/apache/bookkeeper/pull/1410) to impact the client-side behaviors. We should apply the same changes to the V2 request. That would help this [issue](https://github.com/apache/pulsar/issues/14861) to be resolved. *Modification* - Apply the change https://github.com/apache/bookkeeper/pull/1410 to V2 protocol --- .../proto/BookieRequestProcessor.java | 2 +- .../bookkeeper/proto/PacketProcessorBase.java | 56 +++++++++++++++++++ .../bookkeeper/proto/ReadEntryProcessor.java | 7 +-- .../bookkeeper/proto/WriteEntryProcessor.java | 7 ++- .../proto/BookieBackpressureForV2Test.java | 32 +++++++++++ 5 files changed, 95 insertions(+), 9 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 1015fcbf160..886556ff6d7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -642,7 +642,7 @@ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Ch } getRequestStats().getAddEntryRejectedCounter().inc(); - write.sendResponse( + write.sendWriteReqResponse( BookieProtocol.ETOOMANYREQUESTS, ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), requestStats.getAddRequestStats()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index d416b9f1417..91f054c0055 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.bookkeeper.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +66,56 @@ protected boolean isVersionCompatible() { return true; } + protected void sendWriteReqResponse(int rc, Object response, OpStatsLogger statsLogger) { + sendResponse(rc, response, statsLogger); + requestProcessor.onAddRequestFinish(); + } + + protected void sendReadReqResponse(int rc, Object response, OpStatsLogger statsLogger, boolean throttle) { + if (throttle) { + sendResponseAndWait(rc, response, statsLogger); + } else { + sendResponse(rc, response, statsLogger); + } + requestProcessor.onReadRequestFinish(); + } + protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) { + final long writeNanos = MathUtils.nowInNano(); + + final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis(); + if (timeOut >= 0 && !channel.isWritable()) { + if (!requestProcessor.isBlacklisted(channel)) { + synchronized (channel) { + if (!channel.isWritable() && !requestProcessor.isBlacklisted(channel)) { + final long waitUntilNanos = writeNanos + TimeUnit.MILLISECONDS.toNanos(timeOut); + while (!channel.isWritable() && MathUtils.nowInNano() < waitUntilNanos) { + try { + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException e) { + break; + } + } + if (!channel.isWritable()) { + requestProcessor.blacklistChannel(channel); + requestProcessor.handleNonWritableChannel(channel); + } + } + } + } + + if (!channel.isWritable()) { + LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel, + StringUtils.requestToString(request)); + requestProcessor.getRequestStats().getChannelWriteStats() + .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS); + statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); + return; + } else { + requestProcessor.invalidateBlacklist(channel); + } + } + if (channel.isActive()) { channel.writeAndFlush(response, channel.voidPromise()); } else { @@ -106,6 +156,12 @@ public void safeRun() { sendResponse(BookieProtocol.EBADVERSION, ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request), requestProcessor.getRequestStats().getReadRequestStats()); + if (request instanceof BookieProtocol.ReadRequest) { + requestProcessor.onReadRequestFinish(); + } + if (request instanceof BookieProtocol.AddRequest) { + requestProcessor.onAddRequestFinish(); + } return; } processPacket(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 3c7d2fa269b..7c8c2c84938 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -52,6 +52,7 @@ public static ReadEntryProcessor create(ReadRequest request, rep.init(request, channel, requestProcessor); rep.fenceThreadPool = fenceThreadPool; rep.throttleReadResponses = throttleReadResponses; + requestProcessor.onReadRequestStart(channel); return rep; } @@ -132,11 +133,7 @@ private void sendResponse(ByteBuf data, int errorCode, long startTimeNanos) { response = ResponseBuilder.buildErrorResponse(errorCode, request); } - if (throttleReadResponses) { - sendResponseAndWait(errorCode, response, stats.getReadRequestStats()); - } else { - sendResponse(errorCode, response, stats.getReadRequestStats()); - } + sendReadReqResponse(errorCode, response, stats.getReadRequestStats(), throttleReadResponses); recycle(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 622016b146b..a0e59be6bfd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -53,6 +53,7 @@ public static WriteEntryProcessor create(ParsedAddRequest request, Channel chann BookieRequestProcessor requestProcessor) { WriteEntryProcessor wep = RECYCLER.get(); wep.init(request, channel, requestProcessor); + requestProcessor.onAddRequestStart(channel); return wep; } @@ -62,7 +63,7 @@ protected void processPacket() { && !(request.isHighPriority() && requestProcessor.getBookie().isAvailableForHighPriorityWrites())) { LOG.warn("BookieServer is running in readonly mode," + " so rejecting the request from the client!"); - sendResponse(BookieProtocol.EREADONLY, + sendWriteReqResponse(BookieProtocol.EREADONLY, ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, request), requestProcessor.getRequestStats().getAddRequestStats()); request.release(); @@ -108,7 +109,7 @@ protected void processPacket() { if (rc != BookieProtocol.EOK) { requestProcessor.getRequestStats().getAddEntryStats() .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); - sendResponse(rc, + sendWriteReqResponse(rc, ResponseBuilder.buildErrorResponse(rc, request), requestProcessor.getRequestStats().getAddRequestStats()); request.recycle(); @@ -125,7 +126,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, requestProcessor.getRequestStats().getAddEntryStats() .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } - sendResponse(rc, + sendWriteReqResponse(rc, ResponseBuilder.buildAddResponse(request), requestProcessor.getRequestStats().getAddRequestStats()); request.recycle(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java new file mode 100644 index 00000000000..024589cd16d --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.proto; + +import org.junit.Before; + +public class BookieBackpressureForV2Test extends BookieBackpressureTest { + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + baseClientConf.setUseV2WireProtocol(true); + confByIndex(0).setReadWorkerThreadsThrottlingEnabled(false); + } +} From e10e36124e15402ec0f5d5265a9aa0bdb55b43ac Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Thu, 9 Jun 2022 18:51:02 +0800 Subject: [PATCH 2/5] Fix the style issue --- .../apache/bookkeeper/proto/BookieBackpressureForV2Test.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java index 024589cd16d..924f5464e54 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java @@ -20,6 +20,9 @@ import org.junit.Before; +/** + * Tests for bckpressure handling on the server side with V2 protocol. + */ public class BookieBackpressureForV2Test extends BookieBackpressureTest { @Before From df251e4dee50819ecc6b6ded47057b1a628e1649 Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Fri, 10 Jun 2022 08:13:18 +0800 Subject: [PATCH 3/5] Fix the failure tests --- .../org/apache/bookkeeper/proto/WriteEntryProcessorTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index b249d4847c7..e2c141b741d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -72,6 +72,7 @@ public void setup() { when(requestProcessor.getBookie()).thenReturn(bookie); when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE)); when(channel.isActive()).thenReturn(true); + when(channel.isWritable()).thenReturn(true); processor = WriteEntryProcessor.create( request, channel, From b1adf64a7b086b5f15bcb67b8fa8e204a31d7dde Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Fri, 10 Jun 2022 09:26:19 +0800 Subject: [PATCH 4/5] Remove unused change --- .../org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java | 1 - 1 file changed, 1 deletion(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java index 924f5464e54..818c6f829a5 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java @@ -30,6 +30,5 @@ public class BookieBackpressureForV2Test extends BookieBackpressureTest { public void setUp() throws Exception { super.setUp(); baseClientConf.setUseV2WireProtocol(true); - confByIndex(0).setReadWorkerThreadsThrottlingEnabled(false); } } From cbf968a33bbb39e55dc5c5e61557b6eb50ae143f Mon Sep 17 00:00:00 2001 From: Yong Zhang Date: Fri, 10 Jun 2022 15:02:48 +0800 Subject: [PATCH 5/5] Fix the tests --- .../apache/bookkeeper/proto/BookieBackpressureForV2Test.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java index 818c6f829a5..775844d05e6 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java @@ -30,5 +30,7 @@ public class BookieBackpressureForV2Test extends BookieBackpressureTest { public void setUp() throws Exception { super.setUp(); baseClientConf.setUseV2WireProtocol(true); + // the backpressure will bloc the read response, disable it to let it use backpressure mechanism + confByIndex(0).setReadWorkerThreadsThrottlingEnabled(false); } }