Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
Expand All @@ -50,6 +51,11 @@
*/
@Slf4j
public abstract class JdbcAbstractSink<T> implements Sink<T> {

private enum State {
OPEN, FAILED, CLOSED
}

// ----- Runtime fields
protected JdbcSinkConfig jdbcSinkConfig;
@Getter
Expand All @@ -73,9 +79,12 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
private AtomicBoolean isFlushing;
private int batchSize;
private ScheduledExecutorService flushExecutor;
private SinkContext sinkContext;
private final AtomicReference<State> state = new AtomicReference<>(State.OPEN);

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
this.sinkContext = sinkContext;
jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext);
jdbcSinkConfig.validate();

Expand Down Expand Up @@ -148,6 +157,7 @@ private static List<String> getListFromConfig(String jdbcSinkConfig) {

@Override
public void close() throws Exception {
state.set(State.CLOSED);
if (flushExecutor != null) {
int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2;
flushExecutor.shutdown();
Expand Down Expand Up @@ -310,8 +320,9 @@ private void flush() {
connection.rollback();
}
} catch (Exception ex) {
throw new RuntimeException(ex);
log.error("Failed to rollback transaction", ex);
}
fatal(e);
}

isFlushing.set(false);
Expand Down Expand Up @@ -385,4 +396,16 @@ private static boolean isBatchItemFailed(int returnCode) {
return true;
}

/**
* Signal a fatal exception to the framework.
* This will cause the function instance to terminate properly.
*
* @param e the fatal exception
*/
private void fatal(Exception e) {
if (sinkContext != null && state.compareAndSet(State.OPEN, State.FAILED)) {
sinkContext.fatal(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
*/
package org.apache.pulsar.io.jdbc;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -56,6 +61,7 @@
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.io.core.SinkContext;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -133,7 +139,9 @@ protected void configure(Map<String, Object> configuration) {

@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception {
jdbcSink.close();
if (jdbcSink != null) {
jdbcSink.close();
}
sqliteUtils.tearDown();
}

Expand Down Expand Up @@ -860,6 +868,70 @@ public void testNullValueAction(NullValueActionTestConfig config) throws Excepti
}
}

/**
* Test that fatal() is called when an unrecoverable exception occurs during flush.
* This verifies the PIP-297 implementation for proper termination of the sink.
*
* The test works by:
* 1. Opening the sink with a valid table (so open() succeeds)
* 2. Using reflection to replace the insertStatement with a mock that throws SQLException
* 3. Writing a record to trigger flush
* 4. Verifying that fatal() was called with the exception
*/
@Test
public void testFatalCalledOnFlushException() throws Exception {
jdbcSink.close();
jdbcSink = null;

String jdbcUrl = sqliteUtils.sqliteUri();
Map<String, Object> conf = Maps.newHashMap();
conf.put("jdbcUrl", jdbcUrl);
conf.put("tableName", tableName); // Use valid table so open() succeeds
conf.put("key", "field3");
conf.put("nonKey", "field1,field2");
conf.put("batchSize", 1);

SinkContext mockSinkContext = mock(SinkContext.class);
AtomicReference<Throwable> fatalException = new AtomicReference<>();
doAnswer(invocation -> {
fatalException.set(invocation.getArgument(0));
return null;
}).when(mockSinkContext).fatal(any(Throwable.class));

SqliteJdbcAutoSchemaSink sinkWithContext = new SqliteJdbcAutoSchemaSink();
try {
sinkWithContext.open(conf, mockSinkContext);

// Create a mock PreparedStatement that throws SQLException on execute()
PreparedStatement mockStatement = mock(PreparedStatement.class);
SQLException simulatedException = new SQLException("Simulated database connection failure");
doThrow(simulatedException).when(mockStatement).execute();
doThrow(simulatedException).when(mockStatement).executeBatch();

// Use reflection to replace the insertStatement with our mock
FieldUtils.writeField(sinkWithContext, "insertStatement", mockStatement, true);

Foo insertObj = new Foo("f1", "f2", 1);
Map<String, String> props = Maps.newHashMap();
props.put("ACTION", "INSERT");
CompletableFuture<Boolean> future = new CompletableFuture<>();
sinkWithContext.write(createMockFooRecord(insertObj, props, future));

// Wait for the flush to complete and fail
Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
verify(mockSinkContext).fatal(any(Throwable.class));
Assert.assertNotNull(fatalException.get());
Assert.assertTrue(fatalException.get() instanceof SQLException);
Assert.assertEquals(fatalException.get().getMessage(), "Simulated database connection failure");
});

// Verify the record was failed (not acked)
Assert.assertFalse(future.get(1, TimeUnit.SECONDS));
} finally {
sinkWithContext.close();
}
}

@SuppressWarnings("unchecked")
private Record<GenericObject> createMockFooRecord(Foo record, Map<String, String> actionProperties,
CompletableFuture<Boolean> future) {
Expand Down
Loading