diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java index 4b0cab61f..86bddd550 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java @@ -26,7 +26,7 @@ import io.vertx.pgclient.PgNotice; import io.vertx.pgclient.PgNotification; import io.vertx.pgclient.impl.codec.NoticeResponse; -import io.vertx.pgclient.impl.codec.TxFailedEvent; +import io.vertx.pgclient.impl.codec.TxStatusEvent; import io.vertx.pgclient.spi.PgDriver; import io.vertx.sqlclient.codec.SocketConnectionBase; import io.vertx.sqlclient.internal.SqlConnectionBase; @@ -99,9 +99,9 @@ public void handleEvent(Object event) { } else { notice.log(SocketConnectionBase.logger); } - } else if (event instanceof TxFailedEvent) { + } else if (event instanceof TxStatusEvent) { if (tx != null) { - tx.fail(); + tx.status(((TxStatusEvent) event).status()); } } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java index 0f332a135..5df2e399b 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java @@ -31,7 +31,7 @@ import io.vertx.pgclient.impl.codec.NoticeResponse; import io.vertx.pgclient.impl.codec.PgCodec; import io.vertx.pgclient.impl.codec.PgCommandMessage; -import io.vertx.pgclient.impl.codec.TxFailedEvent; +import io.vertx.pgclient.impl.codec.TxStatusEvent; import io.vertx.sqlclient.codec.CommandMessage; import io.vertx.sqlclient.codec.SocketConnectionBase; import io.vertx.sqlclient.spi.connection.Connection; @@ -41,6 +41,7 @@ import io.vertx.sqlclient.spi.protocol.CommandBase; import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand; import io.vertx.sqlclient.spi.protocol.InitCommand; +import io.vertx.sqlclient.spi.protocol.SavepointCommand; import io.vertx.sqlclient.spi.protocol.SimpleQueryCommand; import io.vertx.sqlclient.spi.protocol.TxCommand; @@ -116,7 +117,7 @@ Future sendCancelRequestMessage(int processId, int secretKey) { @Override protected void handleMessage(Object msg) { super.handleMessage(msg); - if (msg instanceof Notification || msg instanceof TxFailedEvent || msg instanceof NoticeResponse) { + if (msg instanceof Notification || msg instanceof TxStatusEvent || msg instanceof NoticeResponse) { handleEvent(msg); } } @@ -172,6 +173,15 @@ protected void doSchedule(CommandBase cmd, Completable handler) { SocketConnectionBase.NULL_COLLECTOR, QueryResultHandler.NOOP_HANDLER); super.doSchedule(cmd2, (res, err) -> handler.complete(tx.result(), err)); + } else if (cmd instanceof SavepointCommand) { + SavepointCommand savepoint = (SavepointCommand) cmd; + SimpleQueryCommand cmd2 = new SimpleQueryCommand<>( + savepoint.sql(), + false, + false, + SocketConnectionBase.NULL_COLLECTOR, + QueryResultHandler.NOOP_HANDLER); + super.doSchedule(cmd2, (res, err) -> handler.complete(savepoint.result(), err)); } else { super.doSchedule(cmd, handler); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java index d244d2b17..67475925f 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java @@ -237,12 +237,11 @@ private void decodeRowDescription(ByteBuf in) { private void decodeReadyForQuery(ChannelHandlerContext ctx, ByteBuf in) { byte id = in.readByte(); if (id == I) { - // IDLE + ctx.fireChannelRead(TxStatusEvent.IDLE); } else if (id == T) { - // ACTIVE + ctx.fireChannelRead(TxStatusEvent.ACTIVE); } else { - // FAILED - ctx.fireChannelRead(TxFailedEvent.INSTANCE); + ctx.fireChannelRead(TxStatusEvent.FAILED); } codec.peek().handleReadyForQuery(); } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/TxStatusEvent.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/TxStatusEvent.java new file mode 100644 index 000000000..e38543131 --- /dev/null +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/TxStatusEvent.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2011-2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.pgclient.impl.codec; + +import io.vertx.sqlclient.impl.TransactionState; + +public class TxStatusEvent { + + public static final TxStatusEvent IDLE = new TxStatusEvent(TransactionState.IDLE); + public static final TxStatusEvent ACTIVE = new TxStatusEvent(TransactionState.ACTIVE); + public static final TxStatusEvent FAILED = new TxStatusEvent(TransactionState.FAILED); + + private final TransactionState status; + + private TxStatusEvent(TransactionState status) { + this.status = status; + } + + public TransactionState status() { + return status; + } +} diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java index 6b1257e70..8523d88b0 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java @@ -73,6 +73,11 @@ public int appendQueryPlaceholder(StringBuilder queryBuilder, int index, int cur return index; } + @Override + public boolean supportsSavepoints() { + return true; + } + @Override public SqlConnectionInternal wrapConnection(ContextInternal context, ConnectionFactory factory, Connection connection) { return new PgConnectionImpl((PgConnectionFactory) factory, context, connection); diff --git a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/tck/PgTransactionTest.java b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/tck/PgTransactionTest.java index b86ac3558..9ad8adc1e 100644 --- a/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/tck/PgTransactionTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/tests/pgclient/tck/PgTransactionTest.java @@ -10,6 +10,7 @@ */ package io.vertx.tests.pgclient.tck; +import io.vertx.core.Future; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; @@ -17,12 +18,19 @@ import io.vertx.pgclient.PgException; import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.PoolOptions; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.TransactionRollbackException; +import io.vertx.sqlclient.Tuple; import io.vertx.tests.pgclient.junit.ContainerPgRule; import io.vertx.tests.sqlclient.tck.TransactionTestBase; import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; +import java.util.Arrays; + @RunWith(VertxUnitRunner.class) public class PgTransactionTest extends TransactionTestBase { @@ -108,4 +116,590 @@ public void testLongTransaction(TestContext ctx) { })); })); } + + @Test + public void testRollbackToSavepointRestoresTransaction(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + insertMutable(res.client, 1, "before") + .compose(v -> res.tx.createSavepoint()) + .compose(sp -> insertMutable(res.client, 2, "rolled-back") + .compose(v -> insertMutable(res.client, 1, "duplicate")) + .compose(v -> Future.failedFuture("Expected duplicate key failure")) + .recover(err -> { + assertSqlState(ctx, err, "23505"); + return sp.rollback() + .compose(v -> sp.release()) + .compose(v -> insertMutable(res.client, 3, "after")) + .compose(v -> res.tx.commit()); + })) + .compose(v -> assertMutableIds(ctx, 1, 3)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testRollbackInnerSavepointKeepsOuterWork(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + insertMutable(res.client, 1, "base") + .compose(v -> res.tx.createSavepoint()) + .compose(sp1 -> insertMutable(res.client, 2, "outer") + .compose(v -> res.tx.createSavepoint()) + .compose(sp2 -> insertMutable(res.client, 3, "inner") + .compose(v -> sp2.rollback()) + .compose(v -> insertMutable(res.client, 4, "after-inner-rollback")) + .compose(v -> sp1.release()) + .compose(v -> res.tx.commit()))) + .compose(v -> assertMutableIds(ctx, 1, 2, 4)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testRollbackInnerThenOuterSavepointKeepsOnlyWorkBeforeOuter(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + insertMutable(res.client, 1, "before-sp1") + .compose(v -> res.tx.createSavepoint()) + .compose(sp1 -> insertMutable(res.client, 2, "between-sp1-sp2") + .compose(v -> res.tx.createSavepoint()) + .compose(sp2 -> insertMutable(res.client, 3, "after-sp2") + .compose(v -> sp2.rollback()) + .compose(v -> insertMutable(res.client, 4, "after-sp2-rollback")) + .compose(v -> sp1.rollback()) + .compose(v -> insertMutable(res.client, 5, "after-sp1-rollback")) + .compose(v -> res.tx.commit()))) + .compose(v -> assertMutableIds(ctx, 1, 5)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testRollbackToSameSavepointTwice(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint() + .compose(sp -> insertMutable(res.client, 1, "first") + .compose(v -> sp.rollback()) + .compose(v -> insertMutable(res.client, 2, "second")) + .compose(v -> sp.rollback()) + .compose(v -> insertMutable(res.client, 3, "third")) + .compose(v -> res.tx.commit())) + .compose(v -> assertMutableIds(ctx, 3)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testReleaseSavepointKeepsWork(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint() + .compose(sp -> insertMutable(res.client, 1, "released-scope") + .compose(v -> sp.release()) + .compose(v -> insertMutable(res.client, 2, "after-release")) + .compose(v -> res.tx.commit())) + .compose(v -> assertMutableIds(ctx, 1, 2)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testCommitCleansUpUnreleasedSavepoint(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint() + .compose(sp -> insertMutable(res.client, 1, "unreleased") + .compose(v -> res.tx.commit())) + .compose(v -> assertMutableIds(ctx, 1)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testReleaseOuterSavepointInvalidatesInnerSavepoint(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + insertMutable(res.client, 1, "base") + .compose(v -> res.tx.createSavepoint()) + .compose(sp1 -> insertMutable(res.client, 2, "outer") + .compose(v -> res.tx.createSavepoint()) + .compose(sp2 -> insertMutable(res.client, 3, "inner") + .compose(v -> sp1.release()) + .compose(v -> sp2.rollback()) + .compose(v -> Future.failedFuture("Expected inner savepoint to be invalidated")) + .recover(err -> { + assertSqlState(ctx, err, "3B001"); + return res.tx.commit(); + }))) + .onComplete(ctx.asyncAssertFailure(err -> { + assertTransactionRollback(ctx, err); + assertMutableIds(ctx).onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + })); + } + + @Test + public void testRollbackOuterSavepointInvalidatesInnerSavepointAndCanRecover(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + insertMutable(res.client, 1, "base") + .compose(v -> res.tx.createSavepoint()) + .compose(sp1 -> insertMutable(res.client, 2, "outer") + .compose(v -> res.tx.createSavepoint()) + .compose(sp2 -> insertMutable(res.client, 3, "inner") + .compose(v -> sp1.rollback()) + .compose(v -> sp2.release()) + .compose(v -> Future.failedFuture("Expected inner savepoint to be invalidated")) + .recover(err -> { + assertSqlState(ctx, err, "3B001"); + return sp1.rollback() + .compose(v -> insertMutable(res.client, 4, "recovered")) + .compose(v -> res.tx.commit()); + }))) + .compose(v -> assertMutableIds(ctx, 1, 4)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testRollbackReleasedSavepointFailsButTransactionCanCommit(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint() + .compose(sp -> sp.release() + .compose(v -> sp.rollback()) + .compose(v -> Future.failedFuture("Expected rollback on released savepoint to fail")) + .recover(err -> { + ctx.assertEquals("Savepoint already released", err.getMessage()); + return insertMutable(res.client, 1, "still-usable") + .compose(v -> res.tx.commit()); + })) + .compose(v -> assertMutableIds(ctx, 1)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testReleaseReleasedSavepointFailsButTransactionCanCommit(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint() + .compose(sp -> sp.release() + .compose(v -> sp.release()) + .compose(v -> Future.failedFuture("Expected second release to fail")) + .recover(err -> { + ctx.assertEquals("Savepoint already released", err.getMessage()); + return insertMutable(res.client, 1, "still-usable") + .compose(v -> res.tx.commit()); + })) + .compose(v -> assertMutableIds(ctx, 1)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testCreateSavepointFailsWhileTransactionIsFailed(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint() + .compose(sp -> insertMutable(res.client, 1, "before-failure") + .compose(v -> insertMutable(res.client, 1, "duplicate")) + .compose(v -> Future.failedFuture("Expected duplicate key failure")) + .recover(err -> { + assertSqlState(ctx, err, "23505"); + return res.tx.createSavepoint() + .compose(v -> Future.failedFuture("Expected create savepoint to fail in failed transaction")) + .recover(err2 -> { + assertSqlState(ctx, err2, "25P02"); + return sp.rollback() + .compose(v -> insertMutable(res.client, 2, "after-recovery")) + .compose(v -> res.tx.commit()); + }); + })) + .compose(v -> assertMutableIds(ctx, 2)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testReleaseSavepointFailsWhileTransactionIsFailed(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint() + .compose(sp -> insertMutable(res.client, 1, "before-failure") + .compose(v -> insertMutable(res.client, 1, "duplicate")) + .compose(v -> Future.failedFuture("Expected duplicate key failure")) + .recover(err -> { + assertSqlState(ctx, err, "23505"); + return sp.release() + .compose(v -> Future.failedFuture("Expected release to fail in failed transaction")) + .recover(err2 -> { + assertSqlState(ctx, err2, "25P02"); + return sp.rollback() + .compose(v -> insertMutable(res.client, 2, "after-recovery")) + .compose(v -> res.tx.commit()); + }); + })) + .compose(v -> assertMutableIds(ctx, 2)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testCanCreateNewSavepointAfterRollbackToSavepoint(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint() + .compose(sp1 -> insertMutable(res.client, 1, "before-failure") + .compose(v -> insertMutable(res.client, 1, "duplicate")) + .compose(v -> Future.failedFuture("Expected duplicate key failure")) + .recover(err -> { + assertSqlState(ctx, err, "23505"); + return sp1.rollback() + .compose(v -> res.tx.createSavepoint()) + .compose(sp2 -> insertMutable(res.client, 2, "after-recovery") + .compose(x -> sp2.release())) + .compose(v -> res.tx.commit()); + })) + .compose(v -> assertMutableIds(ctx, 2)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testRollbackToSavepointAfterRepeatedFailedTransactionStatusRestoresTransaction(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + insertMutable(res.client, 1, "before") + .compose(v -> res.tx.createSavepoint()) + .compose(sp -> insertMutable(res.client, 1, "duplicate") + .compose(v -> Future.failedFuture("Expected duplicate key failure")) + .recover(err -> { + assertSqlState(ctx, err, "23505"); + return res.client.query("SELECT 1") + .execute() + .compose(v -> Future.failedFuture("Expected failed transaction error")) + .recover(err2 -> { + assertSqlState(ctx, err2, "25P02"); + return sp.rollback() + .compose(v -> insertMutable(res.client, 2, "after-recovery")) + .compose(v -> res.tx.commit()); + }); + })) + .compose(v -> assertMutableIds(ctx, 1, 2)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testRollbackToSavepointAfterPreparedQueryFailureRestoresTransaction(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + insertMutable(res.client, 1, "before") + .compose(v -> res.tx.createSavepoint()) + .compose(sp -> res.client.preparedQuery("INSERT INTO mutable (id, val) VALUES ($1, $2)") + .execute(Tuple.of(1, "duplicate")) + .compose(v -> Future.failedFuture("Expected duplicate key failure")) + .recover(err -> { + assertSqlState(ctx, err, "23505"); + return sp.rollback() + .compose(v -> insertMutable(res.client, 2, "after-recovery")) + .compose(v -> res.tx.commit()); + })) + .compose(v -> assertMutableIds(ctx, 1, 2)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testRollbackToSavepointAfterPreparedBatchFailureRestoresTransaction(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + insertMutable(res.client, 1, "before") + .compose(v -> res.tx.createSavepoint()) + .compose(sp -> res.client.preparedQuery("INSERT INTO mutable (id, val) VALUES ($1, $2)") + .executeBatch(Arrays.asList( + Tuple.of(2, "batch-before-error"), + Tuple.of(1, "batch-duplicate"), + Tuple.of(3, "batch-after-error") + )) + .compose(v -> Future.failedFuture("Expected duplicate key failure")) + .recover(err -> { + assertSqlState(ctx, err, "23505"); + return sp.rollback() + .compose(v -> insertMutable(res.client, 4, "after-recovery")) + .compose(v -> res.tx.commit()); + })) + .compose(v -> assertMutableIds(ctx, 1, 4)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testSavepointCommandAlreadyInProgress(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint().onComplete(ctx.asyncAssertSuccess(sp -> { + Future first = sp.release(); + Future second = sp.release(); + + second.onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Savepoint command already in progress", err.getMessage()); + first + .compose(v -> res.tx.commit()) + .compose(v -> assertMutableIds(ctx)) + .onComplete(ctx.asyncAssertSuccess(x -> async.complete())); + })); + })); + })); + } + + @Test + public void testCreateSavepointAfterCommitRequestedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.client.query("SELECT pg_sleep(0.2)") + .execute(); + + Future commit = res.tx.commit(); + + res.tx.createSavepoint() + .onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + commit.onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + })); + } + + @Test + public void testCreateSavepointAfterRollbackRequestedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.client.query("SELECT pg_sleep(0.2)") + .execute(); + + Future rollback = res.tx.rollback(); + + res.tx.createSavepoint() + .onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + rollback.onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + })); + } + + @Test + public void testRollbackSavepointAfterCommitRequestedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint().onComplete(ctx.asyncAssertSuccess(sp -> { + res.client.query("SELECT pg_sleep(0.2)") + .execute(); + + Future commit = res.tx.commit(); + + sp.rollback().onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + commit.onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + })); + })); + } + + @Test + public void testRollbackSavepointAfterRollbackRequestedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint().onComplete(ctx.asyncAssertSuccess(sp -> { + res.client.query("SELECT pg_sleep(0.2)") + .execute(); + + Future rollback = res.tx.rollback(); + + sp.rollback().onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + rollback.onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + })); + })); + } + + @Test + public void testReleaseSavepointAfterCommitRequestedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint().onComplete(ctx.asyncAssertSuccess(sp -> { + res.client.query("SELECT pg_sleep(0.2)") + .execute(); + + Future commit = res.tx.commit(); + + sp.release().onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + commit.onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + })); + })); + } + + @Test + public void testReleaseSavepointAfterRollbackRequestedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint().onComplete(ctx.asyncAssertSuccess(sp -> { + res.client.query("SELECT pg_sleep(0.2)") + .execute(); + + Future rollback = res.tx.rollback(); + + sp.release().onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + rollback.onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + })); + })); + } + + @Test + public void testWholeTransactionRollbackWithSavepoint(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint() + .compose(sp -> insertMutable(res.client, 1, "before-whole-rollback") + .compose(v -> insertMutable(res.client, 2, "still-rolled-back")) + .compose(v -> res.tx.rollback())) + .compose(v -> assertMutableIds(ctx)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + @Test + public void testCreateSavepointAfterRollbackCompletedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.rollback().onComplete(ctx.asyncAssertSuccess(v -> { + res.tx.createSavepoint().onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + async.complete(); + })); + })); + })); + } + + @Test + public void testCreateSavepointAfterCommitCompletedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.commit().onComplete(ctx.asyncAssertSuccess(v -> { + res.tx.createSavepoint().onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + async.complete(); + })); + })); + })); + } + + @Test + public void testRollbackSavepointAfterCommitCompletedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint().onComplete(ctx.asyncAssertSuccess(sp -> { + res.tx.commit().onComplete(ctx.asyncAssertSuccess(v -> { + sp.rollback().onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + async.complete(); + })); + })); + })); + })); + } + + @Test + public void testRollbackSavepointAfterRollbackCompletedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint().onComplete(ctx.asyncAssertSuccess(sp -> { + res.tx.rollback().onComplete(ctx.asyncAssertSuccess(v -> { + sp.rollback().onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + async.complete(); + })); + })); + })); + })); + } + + @Test + public void testReleaseSavepointAfterCommitCompletedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint().onComplete(ctx.asyncAssertSuccess(sp -> { + res.tx.commit().onComplete(ctx.asyncAssertSuccess(v -> { + sp.release().onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + async.complete(); + })); + })); + })); + })); + } + + @Test + public void testReleaseSavepointAfterRollbackCompletedFails(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint().onComplete(ctx.asyncAssertSuccess(sp -> { + res.tx.rollback().onComplete(ctx.asyncAssertSuccess(v -> { + sp.release().onComplete(ctx.asyncAssertFailure(err -> { + ctx.assertEquals("Transaction already completed", err.getMessage()); + async.complete(); + })); + })); + })); + })); + } + + @Test + public void testReleaseAfterRollbackToSameSavepoint(TestContext ctx) { + Async async = ctx.async(); + connector.accept(ctx.asyncAssertSuccess(res -> { + res.tx.createSavepoint() + .compose(sp -> insertMutable(res.client, 1, "one") + .compose(v -> sp.rollback()) + .compose(v -> insertMutable(res.client, 2, "two")) + .compose(v -> sp.release()) + .compose(v -> res.tx.commit())) + .compose(v -> assertMutableIds(ctx, 2)) + .onComplete(ctx.asyncAssertSuccess(v -> async.complete())); + })); + } + + private Future> insertMutable(SqlConnection client, int id, String val) { + return client.query("INSERT INTO mutable (id, val) VALUES (" + id + ", '" + val + "')").execute(); + } + + private Future assertMutableIds(TestContext ctx, int... expectedIds) { + return getPool() + .query("SELECT id FROM mutable ORDER BY id") + .execute() + .map(rows -> { + ctx.assertEquals(expectedIds.length, rows.size()); + int index = 0; + for (Row row : rows) { + ctx.assertEquals(expectedIds[index++], row.getInteger("id").intValue()); + } + return null; + }); + } + + private void assertSqlState(TestContext ctx, Throwable err, String sqlState) { + ctx.assertTrue(err instanceof PgException); + ctx.assertEquals(sqlState, ((PgException) err).getSqlState()); + } + + private void assertTransactionRollback(TestContext ctx, Throwable err) { + ctx.assertTrue(err instanceof TransactionRollbackException); + } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/Savepoint.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/Savepoint.java new file mode 100644 index 000000000..b2e930832 --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/Savepoint.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2011-2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.sqlclient; + +import io.vertx.codegen.annotations.VertxGen; +import io.vertx.core.Future; + +/** + * A savepoint created from a {@link Transaction}. + * + *

A savepoint marks a position inside the current transaction that can later + * be rolled back to, or released when no longer needed. + */ +@VertxGen +public interface Savepoint { + + /** + * Roll back the current transaction to this savepoint. + * + *

The transaction remains active after a successful rollback. + */ + Future rollback(); + + /** + * Release this savepoint. + * + *

After release, this savepoint can no longer be used. + */ + Future release(); +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/Transaction.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/Transaction.java index ccd008011..4a4b2c7f7 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/Transaction.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/Transaction.java @@ -25,6 +25,16 @@ @VertxGen public interface Transaction { + /** + * Create a savepoint in this transaction. + * + *

Fails with {@link UnsupportedOperationException} when the driver does not + * support savepoints. + * + * @return a future notified with the created savepoint + */ + Future createSavepoint(); + /** * Commit the current transaction. */ diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SavepointImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SavepointImpl.java new file mode 100644 index 000000000..a7539bf6f --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/SavepointImpl.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2011-2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.sqlclient.impl; + +import io.vertx.core.Future; +import io.vertx.sqlclient.Savepoint; + +public class SavepointImpl implements Savepoint { + + private enum State { + ACTIVE, + PENDING, + RELEASED + } + + private final TransactionImpl transaction; + private final String name; + private State state; + + public SavepointImpl(TransactionImpl transaction, String name) { + this.transaction = transaction; + this.name = name; + this.state = State.ACTIVE; + } + + @Override + public Future rollback() { + return execute(false, () -> transaction.rollbackToSavepoint(name)); + } + + @Override + public Future release() { + return execute(true, () -> transaction.releaseSavepoint(name)); + } + + private Future execute(boolean release, Action action) { + synchronized (this) { + if (state == State.RELEASED) { + return transaction.failedFuture("Savepoint already released"); + } + if (state == State.PENDING) { + return transaction.failedFuture("Savepoint command already in progress"); + } + state = State.PENDING; + } + return action.execute().andThen(ar -> { + synchronized (SavepointImpl.this) { + if (ar.succeeded()) { + state = release ? State.RELEASED : State.ACTIVE; + } else { + state = State.ACTIVE; + } + } + }); + } + + @FunctionalInterface + private interface Action { + Future execute(); + } +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java index 154155ac2..a8376ab01 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionImpl.java @@ -19,46 +19,87 @@ import io.vertx.core.*; import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.PromiseInternal; +import io.vertx.sqlclient.Savepoint; import io.vertx.sqlclient.Transaction; import io.vertx.sqlclient.TransactionRollbackException; +import io.vertx.sqlclient.spi.Driver; import io.vertx.sqlclient.spi.connection.Connection; import io.vertx.sqlclient.spi.protocol.CommandBase; +import io.vertx.sqlclient.spi.protocol.SavepointCommand; import io.vertx.sqlclient.spi.protocol.TxCommand; public class TransactionImpl implements Transaction { private final ContextInternal context; private final Connection connection; + private final Driver driver; private final Promise completion; private final Handler endHandler; private int pendingQueries; private boolean ended; - private boolean failed; + private boolean rollbackRequested; + private long savepointSeq; private TxCommand endCommand; + private TransactionState state = TransactionState.ACTIVE; - public TransactionImpl(ContextInternal context, Handler endHandler, Connection connection) { + public TransactionImpl(ContextInternal context, Handler endHandler, Connection connection, Driver driver) { this.context = context; this.connection = connection; + this.driver = driver; this.completion = context.promise(); this.endHandler = endHandler; } public Future begin() { - PromiseInternal promise = context.promise(); - TxCommand begin = new TxCommand<>(TxCommand.Kind.BEGIN, this); - scheduleInternal(begin, wrap(begin, promise)); - return promise.future(); + return submit(new TxCommand<>(TxCommand.Kind.BEGIN, this)); + } + + public void status(TransactionState state) { + synchronized (this) { + this.state = state; + } + } + + Future failedFuture(String message) { + return context.failedFuture(message); + } + + @Override + public Future createSavepoint() { + if (!driver.supportsSavepoints()) { + return context.failedFuture(new UnsupportedOperationException( + "Savepoints are not supported by this driver")); + } + + String name; + synchronized (this) { + name = "__vx_sp_" + (++savepointSeq); + } + SavepointImpl savepoint = new SavepointImpl(this, name); + return submit(new SavepointCommand<>(SavepointCommand.Kind.CREATE, name, savepoint)); + } + + Future rollbackToSavepoint(String name) { + return submit(new SavepointCommand<>(SavepointCommand.Kind.ROLLBACK_TO, name, null)); + } + + Future releaseSavepoint(String name) { + return submit(new SavepointCommand<>(SavepointCommand.Kind.RELEASE, name, null)); } - public void fail() { - failed = true; + private Future submit(CommandBase cmd) { + PromiseInternal promise = context.promise(); + if (!scheduleInternal(cmd, wrap(promise))) { + promise.fail("Transaction already completed"); + } + return promise.future(); } private void execute(CommandBase cmd, Completable handler) { connection.schedule(cmd, handler); } - private Completable wrap(CommandBase cmd, Completable handler) { + private Completable wrap(Completable handler) { return (res, err) -> { synchronized (TransactionImpl.this) { pendingQueries--; @@ -69,7 +110,7 @@ private Completable wrap(CommandBase cmd, Completable handler) { } public void schedule(CommandBase cmd, Completable handler) { - if (!scheduleInternal(cmd, wrap(cmd, handler))) { + if (!scheduleInternal(cmd, wrap(handler))) { handler.fail("Transaction already completed"); } } @@ -92,7 +133,7 @@ private void checkEnd() { if (pendingQueries > 0 || !ended || endCommand != null) { return; } - TxCommand.Kind kind = failed ? TxCommand.Kind.ROLLBACK : TxCommand.Kind.COMMIT; + TxCommand.Kind kind = rollbackRequested || state == TransactionState.FAILED ? TxCommand.Kind.ROLLBACK : TxCommand.Kind.COMMIT; cmd = new TxCommand<>(kind, null); handler = (res, err) -> { if (err == null) { @@ -113,7 +154,7 @@ private Future end(boolean rollback) { return context.failedFuture("Transaction already complete"); } ended = true; - failed |= rollback; + rollbackRequested |= rollback; } checkEnd(); return completion.future(); diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/TxFailedEvent.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionState.java similarity index 68% rename from vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/TxFailedEvent.java rename to vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionState.java index 283c1cb43..78e459a00 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/TxFailedEvent.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/TransactionState.java @@ -8,13 +8,10 @@ * * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 */ -package io.vertx.pgclient.impl.codec; - -/** - * Event to signal a transaction is failed. - */ -public class TxFailedEvent { - - public static final TxFailedEvent INSTANCE = new TxFailedEvent(); +package io.vertx.sqlclient.impl; +public enum TransactionState { + IDLE, + ACTIVE, + FAILED } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java index 88bdf0890..43c8885af 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java @@ -176,7 +176,7 @@ public Future begin() { if (tx != null) { throw new IllegalStateException(); } - tx = new TransactionImpl(context, v -> tx = null, conn); + tx = new TransactionImpl(context, v -> tx = null, conn, driver()); return tx.begin(); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/Driver.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/Driver.java index b01765f16..1d099fff7 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/Driver.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/Driver.java @@ -137,4 +137,11 @@ default int appendQueryPlaceholder(StringBuilder queryBuilder, int index, int cu queryBuilder.append("?"); return current; } + + /** + * @return {@code true} when the driver supports savepoints. + */ + default boolean supportsSavepoints() { + return false; + } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/protocol/SavepointCommand.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/protocol/SavepointCommand.java new file mode 100644 index 000000000..d890d1693 --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/protocol/SavepointCommand.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2011-2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.sqlclient.spi.protocol; + +public class SavepointCommand extends CommandBase { + + public enum Kind { + CREATE("SAVEPOINT "), + ROLLBACK_TO("ROLLBACK TO SAVEPOINT "), + RELEASE("RELEASE SAVEPOINT "); + + private final String sqlPrefix; + + Kind(String sqlPrefix) { + this.sqlPrefix = sqlPrefix; + } + + public String sql(String name) { + return sqlPrefix + name; + } + } + + private final Kind kind; + private final String name; + private final R result; + + public SavepointCommand(Kind kind, String name, R result) { + this.kind = kind; + this.name = name; + this.result = result; + } + + public Kind kind() { + return kind; + } + + public String name() { + return name; + } + + public String sql() { + return kind.sql(name); + } + + public R result() { + return result; + } +}