From 7e0abe14c326881aaf7559efd16f241b8f4191df Mon Sep 17 00:00:00 2001 From: Anthony Petrov Date: Wed, 15 Apr 2026 13:43:38 -0700 Subject: [PATCH] fix: use Http2ClientStream.sendPing() again Signed-off-by: Anthony Petrov --- .../pbj/grpc/client/helidon/PbjGrpcCall.java | 60 +------------------ .../grpc/client/helidon/PbjGrpcCallTest.java | 5 +- 2 files changed, 4 insertions(+), 61 deletions(-) diff --git a/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java b/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java index 84c7df8d..7f75cdae 100644 --- a/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java +++ b/pbj-core/pbj-grpc-client-helidon/src/main/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCall.java @@ -17,14 +17,10 @@ import io.helidon.http.Headers; import io.helidon.http.WritableHeaders; import io.helidon.http.http2.Http2FrameData; -import io.helidon.http.http2.Http2FrameHeader; import io.helidon.http.http2.Http2Headers; -import io.helidon.http.http2.Http2Ping; import io.helidon.http.http2.Http2StreamState; import io.helidon.webclient.http2.Http2ClientStream; import io.helidon.webclient.http2.StreamTimeoutException; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -184,58 +180,6 @@ private boolean isStreamOpen() { && clientStream.streamState() != Http2StreamState.CLOSED; } - /** - * Send a ping to the server. - *

- * Do NOT use Http2ClientStream.sendPing()! It works once. A second ping results in sending garbage frames - * to the server (indirectly), and the server closes the connection. The exact cause is still unknown, but it may - * be related to the usage of this connection's flowControl object for sending the pings which may - * interfere with the regular data transfers occurring via this same connection concurrently with the ping. - * Another reason for this may be the fact that it uses a static HTTP2_PING object for sending pings, but - * it never rewind()'s the buffer that holds the ping payload, so the server may read bytes from a subsequent - * regular data frame and interpret them as the ping payload, which should break the HTTP2 connection as a whole. - *

- * There's Http2ClientConnection.ping() method that explicitly uses the FlowControl.Outbound.NOOP for sending - * new ping objects. However, that method is package-private. - *

- * So we implement our own sendPing() here that uses new Http2Ping objects and doesn't use the flowControl. - *

- * NOTE: Http2ClientStream methods use an Http2ConnectionWriter object via Http2ClientConnection.writer() - * to write data, and it's a wrapper around the ClientConnection's DataWriter object. - * And the Http2ConnectionWriter has some additional synchronization around DataWriter.write() calls. - * However, ironically, it doesn't synchronize access to the flowControl object. Regardless, there's no public - * methods to obtain a reference to the Http2ConnectionWriter or its internal lock. So we have to write - * to the ClientConnection's DataWriter object directly. Stress-testing hasn't revealed any thread-races so far. - *

- * It's difficult to imagine a situation where the thread-race could occur. Perhaps a single PbjGrpcClient - * (aka a single HTTP2 connection) and two streaming PbjGrpcCalls (aka HTTP2 streams) open concurrently, - * one being very chatty and another one being very silent. The latter may start sending pings while the former - * is sending requests to the server. However, this scenario seems very rare. If we ever encounter this issue, - * then it's easy to work-around by creating separate PbjGrpcClients for the two calls on the client side. - * To fix it, ideally we'd work with Helidon to expose the necessary APIs for synchronous writes. Alternatively, - * we could introduce a PbjGrpcClient-level outgoing queue and send all requests and pings through it as - * a work-around. However, this work-around may not fully cover the issue because Helidon can write window update - * frames for the flowControl changes concurrently still as it reads data from the stream/socket. - */ - private void sendPing() { - final Http2Ping ping = Http2Ping.create(); - final Http2FrameData frameData = ping.toFrameData(); - final Http2FrameHeader frameHeader = frameData.header(); - if (frameHeader.length() == 0) { - throw new IllegalStateException("Ping with zero length. This should never happen."); - } else { - final BufferData headerData = frameHeader.write(); - final BufferData data = frameData.data().copy(); - try { - grpcClient.getClientConnection().writer().writeNow(BufferData.create(headerData, data)); - } catch (IllegalStateException e) { - // It may throw IllegalStateException: Attempt to call writer() on a closed connection - // But callers usually expect an UncheckedIOException: - throw new UncheckedIOException(new IOException("sendPing failed", e)); - } - } - } - private void receiveRepliesLoop() { try { Http2Headers http2Headers = null; @@ -251,7 +195,7 @@ private void receiveRepliesLoop() { // if the server died. // FUTURE WORK: consider a separate KeepAlive timeout for these pings, so that we don't flood the // network. - sendPing(); + clientStream.sendPing(); } } while (http2Headers == null && isStreamOpen()); @@ -269,7 +213,7 @@ private void receiveRepliesLoop() { frameData = clientStream.readOne(grpcClient.getConfig().readTimeout()); } catch (StreamTimeoutException e) { // Check if the connection is alive. See a comment above about the KeepAlive timeout. - sendPing(); + clientStream.sendPing(); // FUTURE WORK: implement an uber timeout to return continue; } diff --git a/pbj-core/pbj-grpc-client-helidon/src/test/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCallTest.java b/pbj-core/pbj-grpc-client-helidon/src/test/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCallTest.java index 2a276008..c2162b49 100644 --- a/pbj-core/pbj-grpc-client-helidon/src/test/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCallTest.java +++ b/pbj-core/pbj-grpc-client-helidon/src/test/java/com/hedera/pbj/grpc/client/helidon/PbjGrpcCallTest.java @@ -237,8 +237,7 @@ public void testReceiveRepliesLoopPingAndStreamClosed() { runnable.run(); - // A ping: - verify(dataWriter, times(1)).writeNow(any(BufferData.class)); + verify(grpcClientStream, times(1)).sendPing(); verify(pipeline, times(1)).onComplete(); verifyNoMoreInteractions(pipeline); } @@ -292,7 +291,7 @@ public void testReceiveRepliesLoopSingleReply(final boolean isTimeout) throws Ex verify(pipeline, times(1)).onNext(reply); verify(pipeline, times(1)).onComplete(); // A ping: - if (isTimeout) verify(dataWriter, times(1)).writeNow(any(BufferData.class)); + if (isTimeout) verify(grpcClientStream, times(1)).sendPing(); verifyNoMoreInteractions(pipeline); }