diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index f923b61ad50..eee1f52644e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -41,6 +41,7 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; import org.apache.bookkeeper.util.ByteBufList; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -228,7 +229,7 @@ static class ClientSideHandler extends ChannelDuplexHandler { final ClientAuthProvider.Factory authProviderFactory; ClientAuthProvider authProvider; final AtomicLong transactionIdGenerator; - final Queue waitingForAuth = new ConcurrentLinkedQueue<>(); + final Queue> waitingForAuth = new ConcurrentLinkedQueue<>(); final ClientConnectionPeer connectionPeer; private final boolean isUsingV2Protocol; @@ -349,7 +350,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) super.write(ctx, msg, promise); super.flush(ctx); } else { - waitingForAuth.add(msg); + waitingForAuth.add(Pair.of(msg, promise)); } } else if (msg instanceof BookieProtocol.Request) { // let auth messages through, queue the rest @@ -358,10 +359,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) super.write(ctx, msg, promise); super.flush(ctx); } else { - waitingForAuth.add(msg); + waitingForAuth.add(Pair.of(msg, promise)); } } else if (msg instanceof ByteBuf || msg instanceof ByteBufList) { - waitingForAuth.add(msg); + waitingForAuth.add(Pair.of(msg, promise)); } else { LOG.info("[{}] dropping write of message {}", ctx.channel(), msg); } @@ -427,10 +428,10 @@ public void operationComplete(int rc, Void v) { if (rc == BKException.Code.OK) { synchronized (this) { authenticated = true; - Object msg = waitingForAuth.poll(); - while (msg != null) { - ctx.writeAndFlush(msg); - msg = waitingForAuth.poll(); + Pair pair = waitingForAuth.poll(); + while (pair != null && pair.getLeft() != null) { + ctx.writeAndFlush(pair.getLeft(), pair.getRight()); + pair = waitingForAuth.poll(); } } } else {