diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 1015fcbf160..886556ff6d7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -642,7 +642,7 @@ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Ch } getRequestStats().getAddEntryRejectedCounter().inc(); - write.sendResponse( + write.sendWriteReqResponse( BookieProtocol.ETOOMANYREQUESTS, ResponseBuilder.buildErrorResponse(BookieProtocol.ETOOMANYREQUESTS, r), requestStats.getAddRequestStats()); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index d416b9f1417..91f054c0055 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.bookkeeper.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +66,56 @@ protected boolean isVersionCompatible() { return true; } + protected void sendWriteReqResponse(int rc, Object response, OpStatsLogger statsLogger) { + sendResponse(rc, response, statsLogger); + requestProcessor.onAddRequestFinish(); + } + + protected void sendReadReqResponse(int rc, Object response, OpStatsLogger statsLogger, boolean throttle) { + if (throttle) { + sendResponseAndWait(rc, response, statsLogger); + } else { + sendResponse(rc, response, statsLogger); + } + requestProcessor.onReadRequestFinish(); + } + protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) { + final long writeNanos = MathUtils.nowInNano(); + + final long timeOut = requestProcessor.getWaitTimeoutOnBackpressureMillis(); + if (timeOut >= 0 && !channel.isWritable()) { + if (!requestProcessor.isBlacklisted(channel)) { + synchronized (channel) { + if (!channel.isWritable() && !requestProcessor.isBlacklisted(channel)) { + final long waitUntilNanos = writeNanos + TimeUnit.MILLISECONDS.toNanos(timeOut); + while (!channel.isWritable() && MathUtils.nowInNano() < waitUntilNanos) { + try { + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException e) { + break; + } + } + if (!channel.isWritable()) { + requestProcessor.blacklistChannel(channel); + requestProcessor.handleNonWritableChannel(channel); + } + } + } + } + + if (!channel.isWritable()) { + LOGGER.warn("cannot write response to non-writable channel {} for request {}", channel, + StringUtils.requestToString(request)); + requestProcessor.getRequestStats().getChannelWriteStats() + .registerFailedEvent(MathUtils.elapsedNanos(writeNanos), TimeUnit.NANOSECONDS); + statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); + return; + } else { + requestProcessor.invalidateBlacklist(channel); + } + } + if (channel.isActive()) { channel.writeAndFlush(response, channel.voidPromise()); } else { @@ -106,6 +156,12 @@ public void safeRun() { sendResponse(BookieProtocol.EBADVERSION, ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request), requestProcessor.getRequestStats().getReadRequestStats()); + if (request instanceof BookieProtocol.ReadRequest) { + requestProcessor.onReadRequestFinish(); + } + if (request instanceof BookieProtocol.AddRequest) { + requestProcessor.onAddRequestFinish(); + } return; } processPacket(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 3c7d2fa269b..7c8c2c84938 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -52,6 +52,7 @@ public static ReadEntryProcessor create(ReadRequest request, rep.init(request, channel, requestProcessor); rep.fenceThreadPool = fenceThreadPool; rep.throttleReadResponses = throttleReadResponses; + requestProcessor.onReadRequestStart(channel); return rep; } @@ -132,11 +133,7 @@ private void sendResponse(ByteBuf data, int errorCode, long startTimeNanos) { response = ResponseBuilder.buildErrorResponse(errorCode, request); } - if (throttleReadResponses) { - sendResponseAndWait(errorCode, response, stats.getReadRequestStats()); - } else { - sendResponse(errorCode, response, stats.getReadRequestStats()); - } + sendReadReqResponse(errorCode, response, stats.getReadRequestStats(), throttleReadResponses); recycle(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index 622016b146b..a0e59be6bfd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -53,6 +53,7 @@ public static WriteEntryProcessor create(ParsedAddRequest request, Channel chann BookieRequestProcessor requestProcessor) { WriteEntryProcessor wep = RECYCLER.get(); wep.init(request, channel, requestProcessor); + requestProcessor.onAddRequestStart(channel); return wep; } @@ -62,7 +63,7 @@ protected void processPacket() { && !(request.isHighPriority() && requestProcessor.getBookie().isAvailableForHighPriorityWrites())) { LOG.warn("BookieServer is running in readonly mode," + " so rejecting the request from the client!"); - sendResponse(BookieProtocol.EREADONLY, + sendWriteReqResponse(BookieProtocol.EREADONLY, ResponseBuilder.buildErrorResponse(BookieProtocol.EREADONLY, request), requestProcessor.getRequestStats().getAddRequestStats()); request.release(); @@ -108,7 +109,7 @@ protected void processPacket() { if (rc != BookieProtocol.EOK) { requestProcessor.getRequestStats().getAddEntryStats() .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); - sendResponse(rc, + sendWriteReqResponse(rc, ResponseBuilder.buildErrorResponse(rc, request), requestProcessor.getRequestStats().getAddRequestStats()); request.recycle(); @@ -125,7 +126,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, requestProcessor.getRequestStats().getAddEntryStats() .registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS); } - sendResponse(rc, + sendWriteReqResponse(rc, ResponseBuilder.buildAddResponse(request), requestProcessor.getRequestStats().getAddRequestStats()); request.recycle(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java new file mode 100644 index 00000000000..775844d05e6 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.proto; + +import org.junit.Before; + +/** + * Tests for bckpressure handling on the server side with V2 protocol. + */ +public class BookieBackpressureForV2Test extends BookieBackpressureTest { + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + baseClientConf.setUseV2WireProtocol(true); + // the backpressure will bloc the read response, disable it to let it use backpressure mechanism + confByIndex(0).setReadWorkerThreadsThrottlingEnabled(false); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index b249d4847c7..e2c141b741d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -72,6 +72,7 @@ public void setup() { when(requestProcessor.getBookie()).thenReturn(bookie); when(requestProcessor.getRequestStats()).thenReturn(new RequestStats(NullStatsLogger.INSTANCE)); when(channel.isActive()).thenReturn(true); + when(channel.isWritable()).thenReturn(true); processor = WriteEntryProcessor.create( request, channel,