From 5525944dd2173112112678db2783257ad97f14cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C3=A4m?= Date: Thu, 29 Jan 2026 16:26:56 +0100 Subject: [PATCH 1/2] [pulsar-io] feat: implement pip-297 for jdbc sinks --- .../pulsar/io/jdbc/JdbcAbstractSink.java | 25 +++++++++- .../pulsar/io/jdbc/SqliteJdbcSinkTest.java | 47 +++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index ca33b3cfdaba9..73ba6b712f022 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -35,6 +35,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import lombok.AllArgsConstructor; @@ -50,6 +51,11 @@ */ @Slf4j public abstract class JdbcAbstractSink implements Sink { + + private enum State { + OPEN, FAILED, CLOSED + } + // ----- Runtime fields protected JdbcSinkConfig jdbcSinkConfig; @Getter @@ -73,9 +79,12 @@ public abstract class JdbcAbstractSink implements Sink { private AtomicBoolean isFlushing; private int batchSize; private ScheduledExecutorService flushExecutor; + private SinkContext sinkContext; + private final AtomicReference state = new AtomicReference<>(State.OPEN); @Override public void open(Map config, SinkContext sinkContext) throws Exception { + this.sinkContext = sinkContext; jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext); jdbcSinkConfig.validate(); @@ -148,6 +157,7 @@ private static List getListFromConfig(String jdbcSinkConfig) { @Override public void close() throws Exception { + state.set(State.CLOSED); if (flushExecutor != null) { int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2; flushExecutor.shutdown(); @@ -310,8 +320,9 @@ private void flush() { connection.rollback(); } } catch (Exception ex) { - throw new RuntimeException(ex); + log.error("Failed to rollback transaction", ex); } + fatal(e); } isFlushing.set(false); @@ -385,4 +396,16 @@ private static boolean isBatchItemFailed(int returnCode) { return true; } + /** + * Signal a fatal exception to the framework. + * This will cause the function instance to terminate properly. + * + * @param e the fatal exception + */ + private void fatal(Exception e) { + if (sinkContext != null && state.compareAndSet(State.OPEN, State.FAILED)) { + sinkContext.fatal(e); + } + } + } diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java index 901ac9f1e392f..aa7f8c83058cf 100644 --- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java +++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java @@ -18,8 +18,10 @@ */ package org.apache.pulsar.io.jdbc; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -56,6 +58,7 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.source.PulsarRecord; +import org.apache.pulsar.io.core.SinkContext; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -860,6 +863,50 @@ public void testNullValueAction(NullValueActionTestConfig config) throws Excepti } } + /** + * Test that fatal() is called when an unrecoverable exception occurs during flush. + * This verifies the PIP-297 implementation for proper termination of the sink. + */ + @Test + public void testFatalCalledOnFlushException() throws Exception { + jdbcSink.close(); + jdbcSink = null; + + String jdbcUrl = sqliteUtils.sqliteUri(); + Map conf = Maps.newHashMap(); + conf.put("jdbcUrl", jdbcUrl); + conf.put("tableName", "nonexistent_table"); // This will cause an exception on flush + conf.put("key", "field3"); + conf.put("nonKey", "field1,field2"); + conf.put("batchSize", 1); + + SinkContext mockSinkContext = mock(SinkContext.class); + AtomicReference fatalException = new AtomicReference<>(); + doAnswer(invocation -> { + fatalException.set(invocation.getArgument(0)); + return null; + }).when(mockSinkContext).fatal(any(Throwable.class)); + + SqliteJdbcAutoSchemaSink sinkWithContext = new SqliteJdbcAutoSchemaSink(); + try { + sinkWithContext.open(conf, mockSinkContext); + + Foo insertObj = new Foo("f1", "f2", 1); + Map props = Maps.newHashMap(); + props.put("ACTION", "INSERT"); + CompletableFuture future = new CompletableFuture<>(); + sinkWithContext.write(createMockFooRecord(insertObj, props, future)); + + // Wait for the flush to complete and fail + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + verify(mockSinkContext).fatal(any(Throwable.class)); + Assert.assertNotNull(fatalException.get()); + }); + } finally { + sinkWithContext.close(); + } + } + @SuppressWarnings("unchecked") private Record createMockFooRecord(Foo record, Map actionProperties, CompletableFuture future) { From cb2a78861e398b494ec408a15c8a871667f01ad5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C3=A4m?= Date: Thu, 29 Jan 2026 16:47:37 +0100 Subject: [PATCH 2/2] fix: improve test for fatal() on jdbc sink --- .../pulsar/io/jdbc/SqliteJdbcSinkTest.java | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java index aa7f8c83058cf..7cab33df3099b 100644 --- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java +++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java @@ -20,11 +20,14 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -136,7 +139,9 @@ protected void configure(Map configuration) { @AfterMethod(alwaysRun = true) public void tearDown() throws Exception { - jdbcSink.close(); + if (jdbcSink != null) { + jdbcSink.close(); + } sqliteUtils.tearDown(); } @@ -866,6 +871,12 @@ public void testNullValueAction(NullValueActionTestConfig config) throws Excepti /** * Test that fatal() is called when an unrecoverable exception occurs during flush. * This verifies the PIP-297 implementation for proper termination of the sink. + * + * The test works by: + * 1. Opening the sink with a valid table (so open() succeeds) + * 2. Using reflection to replace the insertStatement with a mock that throws SQLException + * 3. Writing a record to trigger flush + * 4. Verifying that fatal() was called with the exception */ @Test public void testFatalCalledOnFlushException() throws Exception { @@ -875,7 +886,7 @@ public void testFatalCalledOnFlushException() throws Exception { String jdbcUrl = sqliteUtils.sqliteUri(); Map conf = Maps.newHashMap(); conf.put("jdbcUrl", jdbcUrl); - conf.put("tableName", "nonexistent_table"); // This will cause an exception on flush + conf.put("tableName", tableName); // Use valid table so open() succeeds conf.put("key", "field3"); conf.put("nonKey", "field1,field2"); conf.put("batchSize", 1); @@ -891,6 +902,15 @@ public void testFatalCalledOnFlushException() throws Exception { try { sinkWithContext.open(conf, mockSinkContext); + // Create a mock PreparedStatement that throws SQLException on execute() + PreparedStatement mockStatement = mock(PreparedStatement.class); + SQLException simulatedException = new SQLException("Simulated database connection failure"); + doThrow(simulatedException).when(mockStatement).execute(); + doThrow(simulatedException).when(mockStatement).executeBatch(); + + // Use reflection to replace the insertStatement with our mock + FieldUtils.writeField(sinkWithContext, "insertStatement", mockStatement, true); + Foo insertObj = new Foo("f1", "f2", 1); Map props = Maps.newHashMap(); props.put("ACTION", "INSERT"); @@ -901,7 +921,12 @@ public void testFatalCalledOnFlushException() throws Exception { Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { verify(mockSinkContext).fatal(any(Throwable.class)); Assert.assertNotNull(fatalException.get()); + Assert.assertTrue(fatalException.get() instanceof SQLException); + Assert.assertEquals(fatalException.get().getMessage(), "Simulated database connection failure"); }); + + // Verify the record was failed (not acked) + Assert.assertFalse(future.get(1, TimeUnit.SECONDS)); } finally { sinkWithContext.close(); }