From 6602f13cca8a24554b8c98931fabb1d72527b7e5 Mon Sep 17 00:00:00 2001 From: doxlik Date: Tue, 31 Mar 2026 21:06:30 +0400 Subject: [PATCH 1/5] Increment inflight counter on prepare-with-parametes-types query Signed-off-by: doxlik --- .../java/io/vertx/sqlclient/codec/SocketConnectionBase.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java index 8b1b9a973..9fd755f25 100644 --- a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java +++ b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java @@ -336,6 +336,8 @@ private void fireCommandMessage(ChannelHandlerContext chctx, CommandMessage Date: Fri, 10 Apr 2026 18:14:26 +0400 Subject: [PATCH 2/5] test --- ...paredStatementRepreparePipeliningTest.java | 150 ++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementRepreparePipeliningTest.java diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementRepreparePipeliningTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementRepreparePipeliningTest.java new file mode 100644 index 000000000..fc2f8cdfc --- /dev/null +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementRepreparePipeliningTest.java @@ -0,0 +1,150 @@ +package io.vertx.tests.pgclient; + +import io.vertx.core.Future; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.PgConnection; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.Tuple; +import io.vertx.tests.sqlclient.ProxyServer; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntConsumer; + +public class PreparedStatementRepreparePipeliningTest extends PreparedStatementTestBase { + + @Override + protected PgConnectOptions options() { + return new PgConnectOptions(options) + .setPipeliningLimit(1); + } + + @Test + public void testReprepareDoesNotBypassPipeliningLimitWithEnabledCache(TestContext ctx) { + testReprepareDoesNotBypassPipeliningLimit(ctx, true); + } + + @Test + public void testReprepareDoesNotBypassPipeliningLimitWithDisabledCache(TestContext ctx) { + testReprepareDoesNotBypassPipeliningLimit(ctx, false); + } + + private void testReprepareDoesNotBypassPipeliningLimit(TestContext ctx, boolean cachePreparedStatements) { + Async async = ctx.async(); + + PgConnectOptions backend = options().setCachePreparedStatements(cachePreparedStatements); + ProxyServer proxy = ProxyServer.create(vertx, backend.getPort(), backend.getHost()); + + AtomicBoolean observe = new AtomicBoolean(); + AtomicInteger readyForQueryCount = new AtomicInteger(); + AtomicBoolean sawSecondQuery = new AtomicBoolean(); + AtomicBoolean secondQueryTooEarly = new AtomicBoolean(); + + TaggedMessageScanner frontendScanner = new TaggedMessageScanner(); + TaggedMessageScanner backendScanner = new TaggedMessageScanner(); + + proxy.proxyHandler(conn -> { + conn.clientHandler(buff -> { + if (observe.get()) { + frontendScanner.handle(buff, tag -> { + if (tag == 'Q') { + sawSecondQuery.set(true); + if (readyForQueryCount.get() < 3) { + secondQueryTooEarly.set(true); + } + } + }); + } + conn.serverSocket().write(buff); + }); + + conn.serverHandler(buff -> { + if (observe.get()) { + backendScanner.handle(buff, tag -> { + if (tag == 'Z') { + readyForQueryCount.incrementAndGet(); + } + }); + } + conn.clientSocket().write(buff); + }); + + conn.connect(); + }); + + proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(v -> { + PgConnectOptions proxied = new PgConnectOptions(backend) + .setHost("localhost") + .setPort(8080); + + PgConnection.connect(vertx, proxied).onComplete(ctx.asyncAssertSuccess(conn -> { + observe.set(true); + + Future + .all( + conn.preparedQuery("WITH s AS (SELECT pg_sleep(1)) SELECT CONCAT('HELLO ', $1) FROM s") + .execute(Tuple.of("WORLD")), + conn.query("SELECT 1").execute() + ) + .eventually(() -> conn.close()) + .onComplete(ctx.asyncAssertSuccess(ar -> { + RowSet first = ar.result().resultAt(0); + RowSet second = ar.result().resultAt(1); + + ctx.assertEquals(1, first.size()); + ctx.assertEquals("HELLO WORLD", first.iterator().next().getString(0)); + + ctx.assertEquals(1, second.size()); + ctx.assertEquals(1, second.iterator().next().getInteger(0).intValue()); + + ctx.assertTrue( + sawSecondQuery.get(), + "Test setup invalid: did not observe frontend simple-query ('Q') message for the second command" + ); + + ctx.assertFalse( + secondQueryTooEarly.get(), + "Second command was written too early before reprepare flow finished " + + "(cachePreparedStatements=" + cachePreparedStatements + + ", readyForQueryCount=" + readyForQueryCount.get() + ")" + ); + + async.complete(); + })); + })); + })); + } + + private static final class TaggedMessageScanner { + private Buffer pending = Buffer.buffer(); + + void handle(Buffer incoming, IntConsumer tagHandler) { + pending.appendBuffer(incoming); + while (true) { + if (pending.length() < 5) { + return; + } + int len = pending.getInt(1); + if (len < 4) { + throw new IllegalStateException("Invalid PostgreSQL message length: " + len); + } + int frameLen = 1 + len; + if (pending.length() < frameLen) { + return; + } + int tag = pending.getByte(0) & 0xFF; + tagHandler.accept(tag); + if (pending.length() == frameLen) { + pending = Buffer.buffer(); + } else { + pending = pending.getBuffer(frameLen, pending.length()); + } + } + } + } +} From 23523c2e2ff937b1e0b4279094d5951cc064513c Mon Sep 17 00:00:00 2001 From: doxlik Date: Wed, 13 May 2026 23:12:57 +0400 Subject: [PATCH 3/5] Fix prepared statement reprepare inflight accounting Signed-off-by: doxlik --- ...paredStatementRepreparePipeliningTest.java | 150 ------------------ .../PreparedStatementReprepareTest.java | 73 +++++++++ .../sqlclient/codec/SocketConnectionBase.java | 4 + 3 files changed, 77 insertions(+), 150 deletions(-) delete mode 100644 vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementRepreparePipeliningTest.java create mode 100644 vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementReprepareTest.java diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementRepreparePipeliningTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementRepreparePipeliningTest.java deleted file mode 100644 index fc2f8cdfc..000000000 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementRepreparePipeliningTest.java +++ /dev/null @@ -1,150 +0,0 @@ -package io.vertx.tests.pgclient; - -import io.vertx.core.Future; -import io.vertx.core.buffer.Buffer; -import io.vertx.ext.unit.Async; -import io.vertx.ext.unit.TestContext; -import io.vertx.pgclient.PgConnectOptions; -import io.vertx.pgclient.PgConnection; -import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.RowSet; -import io.vertx.sqlclient.Tuple; -import io.vertx.tests.sqlclient.ProxyServer; -import org.junit.Test; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.IntConsumer; - -public class PreparedStatementRepreparePipeliningTest extends PreparedStatementTestBase { - - @Override - protected PgConnectOptions options() { - return new PgConnectOptions(options) - .setPipeliningLimit(1); - } - - @Test - public void testReprepareDoesNotBypassPipeliningLimitWithEnabledCache(TestContext ctx) { - testReprepareDoesNotBypassPipeliningLimit(ctx, true); - } - - @Test - public void testReprepareDoesNotBypassPipeliningLimitWithDisabledCache(TestContext ctx) { - testReprepareDoesNotBypassPipeliningLimit(ctx, false); - } - - private void testReprepareDoesNotBypassPipeliningLimit(TestContext ctx, boolean cachePreparedStatements) { - Async async = ctx.async(); - - PgConnectOptions backend = options().setCachePreparedStatements(cachePreparedStatements); - ProxyServer proxy = ProxyServer.create(vertx, backend.getPort(), backend.getHost()); - - AtomicBoolean observe = new AtomicBoolean(); - AtomicInteger readyForQueryCount = new AtomicInteger(); - AtomicBoolean sawSecondQuery = new AtomicBoolean(); - AtomicBoolean secondQueryTooEarly = new AtomicBoolean(); - - TaggedMessageScanner frontendScanner = new TaggedMessageScanner(); - TaggedMessageScanner backendScanner = new TaggedMessageScanner(); - - proxy.proxyHandler(conn -> { - conn.clientHandler(buff -> { - if (observe.get()) { - frontendScanner.handle(buff, tag -> { - if (tag == 'Q') { - sawSecondQuery.set(true); - if (readyForQueryCount.get() < 3) { - secondQueryTooEarly.set(true); - } - } - }); - } - conn.serverSocket().write(buff); - }); - - conn.serverHandler(buff -> { - if (observe.get()) { - backendScanner.handle(buff, tag -> { - if (tag == 'Z') { - readyForQueryCount.incrementAndGet(); - } - }); - } - conn.clientSocket().write(buff); - }); - - conn.connect(); - }); - - proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(v -> { - PgConnectOptions proxied = new PgConnectOptions(backend) - .setHost("localhost") - .setPort(8080); - - PgConnection.connect(vertx, proxied).onComplete(ctx.asyncAssertSuccess(conn -> { - observe.set(true); - - Future - .all( - conn.preparedQuery("WITH s AS (SELECT pg_sleep(1)) SELECT CONCAT('HELLO ', $1) FROM s") - .execute(Tuple.of("WORLD")), - conn.query("SELECT 1").execute() - ) - .eventually(() -> conn.close()) - .onComplete(ctx.asyncAssertSuccess(ar -> { - RowSet first = ar.result().resultAt(0); - RowSet second = ar.result().resultAt(1); - - ctx.assertEquals(1, first.size()); - ctx.assertEquals("HELLO WORLD", first.iterator().next().getString(0)); - - ctx.assertEquals(1, second.size()); - ctx.assertEquals(1, second.iterator().next().getInteger(0).intValue()); - - ctx.assertTrue( - sawSecondQuery.get(), - "Test setup invalid: did not observe frontend simple-query ('Q') message for the second command" - ); - - ctx.assertFalse( - secondQueryTooEarly.get(), - "Second command was written too early before reprepare flow finished " + - "(cachePreparedStatements=" + cachePreparedStatements + - ", readyForQueryCount=" + readyForQueryCount.get() + ")" - ); - - async.complete(); - })); - })); - })); - } - - private static final class TaggedMessageScanner { - private Buffer pending = Buffer.buffer(); - - void handle(Buffer incoming, IntConsumer tagHandler) { - pending.appendBuffer(incoming); - while (true) { - if (pending.length() < 5) { - return; - } - int len = pending.getInt(1); - if (len < 4) { - throw new IllegalStateException("Invalid PostgreSQL message length: " + len); - } - int frameLen = 1 + len; - if (pending.length() < frameLen) { - return; - } - int tag = pending.getByte(0) & 0xFF; - tagHandler.accept(tag); - if (pending.length() == frameLen) { - pending = Buffer.buffer(); - } else { - pending = pending.getBuffer(frameLen, pending.length()); - } - } - } - } -} diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementReprepareTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementReprepareTest.java new file mode 100644 index 000000000..81d2aa612 --- /dev/null +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementReprepareTest.java @@ -0,0 +1,73 @@ +package io.vertx.tests.pgclient; + +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.PgConnection; +import io.vertx.pgclient.impl.PgSocketConnection; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.Tuple; +import io.vertx.sqlclient.internal.SqlConnectionInternal; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class PreparedStatementReprepareTest extends PgTestBase { + + private Vertx vertx; + + @Before + public void setup() throws Exception { + super.setup(); + vertx = Vertx.vertx(); + } + + @After + public void tearDown(TestContext ctx) { + vertx.close().onComplete(ctx.asyncAssertSuccess()); + } + + @Test + public void testReprepareDoesNotMakeInflightNegativeWithEnabledCache(TestContext ctx) { + testReprepareDoesNotMakeInflightNegative(ctx, true); + } + + @Test + public void testReprepareDoesNotMakeInflightNegativeWithDisabledCache(TestContext ctx) { + testReprepareDoesNotMakeInflightNegative(ctx, false); + } + + private void testReprepareDoesNotMakeInflightNegative(TestContext ctx, boolean cachePreparedStatements) { + Async async = ctx.async(); + + PgConnectOptions options = new PgConnectOptions(this.options) + .setCachePreparedStatements(cachePreparedStatements); + + PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> { + PgSocketConnection socket = (PgSocketConnection) ((SqlConnectionInternal) conn).unwrap(); + + conn + .preparedQuery("SELECT CONCAT('HELLO ', $1)") + .execute(Tuple.of("WORLD")) + .map(rows -> { + RowSet result = rows; + + ctx.assertEquals(1, result.size()); + ctx.assertEquals("HELLO WORLD", result.iterator().next().getString(0)); + + ctx.assertEquals( + 0, + socket.inflight(), + "Inflight count should be zero after reprepare query completion " + + "(cachePreparedStatements=" + cachePreparedStatements + ")" + ); + + return rows; + }) + .eventually(() -> conn.close()) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } +} diff --git a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java index 9fd755f25..377b72fb0 100644 --- a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java +++ b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java @@ -115,6 +115,10 @@ public int pipeliningLimit() { return pipeliningLimit; } + public int inflight() { + return inflight; + } + @Override public TracingPolicy tracingPolicy() { return connectOptions().getTracingPolicy(); From 7eef379a003a75da82812fa5cb60932c58a6d081 Mon Sep 17 00:00:00 2001 From: doxlik Date: Wed, 13 May 2026 23:14:52 +0400 Subject: [PATCH 4/5] Fix prepared statement reprepare inflight accounting Signed-off-by: doxlik --- .../io/vertx/tests/pgclient/PreparedStatementReprepareTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementReprepareTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementReprepareTest.java index 81d2aa612..6b712f569 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementReprepareTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementReprepareTest.java @@ -66,7 +66,7 @@ private void testReprepareDoesNotMakeInflightNegative(TestContext ctx, boolean c return rows; }) - .eventually(() -> conn.close()) + .eventually(conn::close) .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); })); } From a23936ef2af247d165c718c2234ecf41fd23b013 Mon Sep 17 00:00:00 2001 From: doxlik Date: Wed, 13 May 2026 23:20:47 +0400 Subject: [PATCH 5/5] Fix prepared statement reprepare inflight accounting Signed-off-by: doxlik --- .../java/io/vertx/sqlclient/codec/SocketConnectionBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java index 377b72fb0..3d17bd406 100644 --- a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java +++ b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java @@ -340,7 +340,7 @@ private void fireCommandMessage(ChannelHandlerContext chctx, CommandMessage