diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementReprepareTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementReprepareTest.java new file mode 100644 index 000000000..6b712f569 --- /dev/null +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/PreparedStatementReprepareTest.java @@ -0,0 +1,73 @@ +package io.vertx.tests.pgclient; + +import io.vertx.core.Vertx; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.pgclient.PgConnectOptions; +import io.vertx.pgclient.PgConnection; +import io.vertx.pgclient.impl.PgSocketConnection; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.Tuple; +import io.vertx.sqlclient.internal.SqlConnectionInternal; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class PreparedStatementReprepareTest extends PgTestBase { + + private Vertx vertx; + + @Before + public void setup() throws Exception { + super.setup(); + vertx = Vertx.vertx(); + } + + @After + public void tearDown(TestContext ctx) { + vertx.close().onComplete(ctx.asyncAssertSuccess()); + } + + @Test + public void testReprepareDoesNotMakeInflightNegativeWithEnabledCache(TestContext ctx) { + testReprepareDoesNotMakeInflightNegative(ctx, true); + } + + @Test + public void testReprepareDoesNotMakeInflightNegativeWithDisabledCache(TestContext ctx) { + testReprepareDoesNotMakeInflightNegative(ctx, false); + } + + private void testReprepareDoesNotMakeInflightNegative(TestContext ctx, boolean cachePreparedStatements) { + Async async = ctx.async(); + + PgConnectOptions options = new PgConnectOptions(this.options) + .setCachePreparedStatements(cachePreparedStatements); + + PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> { + PgSocketConnection socket = (PgSocketConnection) ((SqlConnectionInternal) conn).unwrap(); + + conn + .preparedQuery("SELECT CONCAT('HELLO ', $1)") + .execute(Tuple.of("WORLD")) + .map(rows -> { + RowSet result = rows; + + ctx.assertEquals(1, result.size()); + ctx.assertEquals("HELLO WORLD", result.iterator().next().getString(0)); + + ctx.assertEquals( + 0, + socket.inflight(), + "Inflight count should be zero after reprepare query completion " + + "(cachePreparedStatements=" + cachePreparedStatements + ")" + ); + + return rows; + }) + .eventually(conn::close) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } +} diff --git a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java index 8b1b9a973..3d17bd406 100644 --- a/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java +++ b/vertx-sql-client-codec/src/main/java/io/vertx/sqlclient/codec/SocketConnectionBase.java @@ -115,6 +115,10 @@ public int pipeliningLimit() { return pipeliningLimit; } + public int inflight() { + return inflight; + } + @Override public TracingPolicy tracingPolicy() { return connectOptions().getTracingPolicy(); @@ -336,6 +340,8 @@ private void fireCommandMessage(ChannelHandlerContext chctx, CommandMessage