diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 6f8148db1..bbfa508e3 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -204,16 +204,154 @@ public static void shutdown() { class UpsertTests { @Test - @DisplayName("Should throw UnsupportedOperationException for upsert") - void testUpsertNewDocument() { + @DisplayName("Should create new document when key doesn't exist and return true") + void testUpsertNewDocument() throws Exception { + String docId = getRandomDocId(4); + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("_id", 100); + objectNode.put("id", docId); objectNode.put("item", "NewItem"); objectNode.put("price", 99); Document document = new JSONDocument(objectNode); - Key key = new SingleValueKey("default", "100"); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); + + boolean isNew = flatCollection.upsert(key, document); + + assertTrue(isNew, "Should return true for new document"); + + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("NewItem", rs.getString("item")); + assertEquals(99, rs.getInt("price")); + }); + } + + @Test + @DisplayName("Should merge with existing document preserving unspecified fields") + void testUpsertMergesWithExistingDocument() throws Exception { + String docId = getRandomDocId(4); + + // First, create a document with multiple fields + ObjectNode initialNode = OBJECT_MAPPER.createObjectNode(); + initialNode.put("id", docId); + initialNode.put("item", "Original Item"); + initialNode.put("price", 100); + initialNode.put("quantity", 50); + initialNode.put("in_stock", true); + Document initialDoc = new JSONDocument(initialNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); + + boolean firstResult = flatCollection.upsert(key, initialDoc); + assertTrue(firstResult, "First upsert should create new document"); + + // Now upsert with only some fields - others should be PRESERVED (merge behavior) + ObjectNode mergeNode = OBJECT_MAPPER.createObjectNode(); + mergeNode.put("id", docId); + mergeNode.put("item", "Updated Item"); + // price and quantity are NOT specified - they should retain their original values + Document mergeDoc = new JSONDocument(mergeNode); + + boolean secondResult = flatCollection.upsert(key, mergeDoc); + assertFalse(secondResult, "Second upsert should update existing document"); + + // Verify merge behavior: item updated, price/quantity/in_stock preserved + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("Updated Item", rs.getString("item")); + // These should be PRESERVED from the original document (merge semantics) + assertEquals(100, rs.getInt("price")); + assertEquals(50, rs.getInt("quantity")); + assertTrue(rs.getBoolean("in_stock")); + }); + } + + @Test + @DisplayName("Upsert vs CreateOrReplace: upsert preserves, createOrReplace resets to default") + void testUpsertVsCreateOrReplaceBehavior() throws Exception { + String docId1 = getRandomDocId(4); + String docId2 = getRandomDocId(4); - assertThrows(UnsupportedOperationException.class, () -> flatCollection.upsert(key, document)); + // Setup: Create two identical documents + ObjectNode initialNode = OBJECT_MAPPER.createObjectNode(); + initialNode.put("item", "Original Item"); + initialNode.put("price", 100); + initialNode.put("quantity", 50); + + ObjectNode doc1 = initialNode.deepCopy(); + doc1.put("id", docId1); + ObjectNode doc2 = initialNode.deepCopy(); + doc2.put("id", docId2); + + Key key1 = new SingleValueKey(DEFAULT_TENANT, docId1); + Key key2 = new SingleValueKey(DEFAULT_TENANT, docId2); + + flatCollection.upsert(key1, new JSONDocument(doc1)); + flatCollection.upsert(key2, new JSONDocument(doc2)); + + // Now update both with partial documents (only item field) + ObjectNode partialUpdate = OBJECT_MAPPER.createObjectNode(); + partialUpdate.put("item", "Updated Item"); + + ObjectNode partial1 = partialUpdate.deepCopy(); + partial1.put("id", docId1); + ObjectNode partial2 = partialUpdate.deepCopy(); + partial2.put("id", docId2); + + // Use upsert for doc1 - should PRESERVE price and quantity + flatCollection.upsert(key1, new JSONDocument(partial1)); + + // Use createOrReplace for doc2 - should RESET price and quantity to NULL (default) + flatCollection.createOrReplace(key2, new JSONDocument(partial2)); + + // Verify upsert preserved original values + queryAndAssert( + key1, + rs -> { + assertTrue(rs.next()); + assertEquals("Updated Item", rs.getString("item")); + assertEquals(100, rs.getInt("price")); // PRESERVED + assertEquals(50, rs.getInt("quantity")); // PRESERVED + }); + + // Verify createOrReplace reset to defaults + queryAndAssert( + key2, + rs -> { + assertTrue(rs.next()); + assertEquals("Updated Item", rs.getString("item")); + assertNull(rs.getObject("price")); // RESET to NULL + assertNull(rs.getObject("quantity")); // RESET to NULL + }); + } + + @Test + @DisplayName("Should skip unknown fields in upsert (default SKIP strategy)") + void testUpsertSkipsUnknownFields() throws Exception { + String docId = getRandomDocId(4); + + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", docId); + objectNode.put("item", "Item with unknown"); + objectNode.put("price", 200); + objectNode.put("unknown_field", "should be skipped"); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, docId); + + boolean isNew = flatCollection.upsert(key, document); + assertTrue(isNew); + + // Verify only known fields were inserted + queryAndAssert( + key, + rs -> { + assertTrue(rs.next()); + assertEquals("Item with unknown", rs.getString("item")); + assertEquals(200, rs.getInt("price")); + }); } @Test diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 816b85b1a..007776f9e 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -148,7 +148,7 @@ private PostgresQueryParser createParser(Query query) { @Override public boolean upsert(Key key, Document document) throws IOException { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); + return upsertWithRetry(key, document, false); } @Override @@ -325,7 +325,7 @@ public boolean bulkUpsert(Map documents) { // Build the bulk upsert SQL with all columns List columnList = new ArrayList<>(allColumns); - String sql = buildBulkUpsertSql(columnList, quotedPkColumn); + String sql = buildMergeUpsertSql(columnList, quotedPkColumn, false); LOGGER.debug("Bulk upsert SQL: {}", sql); try (Connection conn = client.getPooledConnection(); @@ -374,27 +374,34 @@ public boolean bulkUpsert(Map documents) { } /** - * Builds a PostgreSQL bulk upsert SQL statement for batch execution. + * Builds a PostgreSQL upsert SQL statement with merge semantics. + * + *

Generates: INSERT ... ON CONFLICT DO UPDATE SET col = EXCLUDED.col for each column. Only + * columns in the provided list are updated on conflict (merge behavior). * - * @param columns List of quoted column names (PK should be first) + * @param columns List of quoted column names to include * @param pkColumn The quoted primary key column name + * @param includeReturning If true, adds RETURNING clause to detect insert vs update * @return The upsert SQL statement */ - private String buildBulkUpsertSql(List columns, String pkColumn) { + private String buildMergeUpsertSql( + List columns, String pkColumn, boolean includeReturning) { String columnList = String.join(", ", columns); String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new)); - // Build SET clause for non-PK columns: col = EXCLUDED.col (this ensures that on conflict, the - // new value is picked) + // Build SET clause for non-PK columns: col = EXCLUDED.col String setClause = columns.stream() .filter(col -> !col.equals(pkColumn)) .map(col -> col + " = EXCLUDED." + col) .collect(Collectors.joining(", ")); - return String.format( - "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s", - tableIdentifier, columnList, placeholders, pkColumn, setClause); + String sql = + String.format( + "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s", + tableIdentifier, columnList, placeholders, pkColumn, setClause); + + return includeReturning ? sql + " RETURNING (xmax = 0) AS is_insert" : sql; } @Override @@ -880,13 +887,81 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR return executeUpsert(sql, parsed); } catch (PSQLException e) { - return handlePSQLExceptionForUpsert(e, key, document, tableName, isRetry); + return handlePSQLExceptionForCreateOrReplace(e, key, document, tableName, isRetry); } catch (SQLException e) { LOGGER.error("SQLException in createOrReplace. key: {} content: {}", key, document, e); throw new IOException(e); } } + /** + * Upserts a document with merge semantics - only updates columns present in the document, + * preserving existing values for columns not in the document. + * + *

Unlike {@link #createOrReplaceWithRetry}, this method does NOT reset missing columns to + * their default values. + * + * @param key The document key + * @param document The document to upsert + * @param isRetry Whether this is a retry attempt after schema refresh + * @return true if a new document was created, false if an existing document was updated + */ + private boolean upsertWithRetry(Key key, Document document, boolean isRetry) throws IOException { + String tableName = tableIdentifier.getTableName(); + List skippedFields = new ArrayList<>(); + + try { + TypedDocument parsed = parseDocument(document, tableName, skippedFields); + + // Add the key as the primary key column + String pkColumn = getPKForTable(tableName); + String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn); + PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn); + parsed.add(quotedPkColumn, key.toString(), pkType, false); + + List docColumns = parsed.getColumns(); + + String sql = buildUpsertSql(docColumns, quotedPkColumn); + LOGGER.debug("Upsert (merge) SQL: {}", sql); + + return executeUpsert(sql, parsed); + + } catch (PSQLException e) { + return handlePSQLExceptionForUpsert(e, key, document, tableName, isRetry); + } catch (SQLException e) { + LOGGER.error("SQLException in upsert. key: {} content: {}", key, document, e); + throw new IOException(e); + } + } + + /** + * Builds a PostgreSQL upsert SQL statement with merge semantics. + * + *

This method constructs an atomic upsert query that: + * + *

    + *
  • Inserts a new row if no conflict on the primary key + *
  • If the row with that PK already exists, only updates columns present in the document + *
  • Columns NOT in the document retain their existing values (merge behavior) + *
+ * + *

Generated SQL pattern: + * + *

{@code
+   * INSERT INTO table (col1, col2, pk_col)
+   * VALUES (?, ?, ?)
+   * ON CONFLICT (pk_col) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2
+   * RETURNING (xmax = 0) AS is_insert
+   * }
+ * + * @param docColumns columns present in the document + * @param pkColumn The quoted primary key column name used for conflict detection + * @return The complete upsert SQL statement with placeholders for values + */ + private String buildUpsertSql(List docColumns, String pkColumn) { + return buildMergeUpsertSql(docColumns, pkColumn, true); + } + /** * Builds a PostgreSQL upsert (INSERT ... ON CONFLICT DO UPDATE) SQL statement. * @@ -977,12 +1052,12 @@ private boolean executeUpsert(String sql, TypedDocument parsed) throws SQLExcept } } - private boolean handlePSQLExceptionForUpsert( + private boolean handlePSQLExceptionForCreateOrReplace( PSQLException e, Key key, Document document, String tableName, boolean isRetry) throws IOException { if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) { LOGGER.info( - "Schema mismatch detected during upsert (SQLState: {}), refreshing schema and retrying. key: {}", + "Schema mismatch detected during createOrReplace (SQLState: {}), refreshing schema and retrying. key: {}", e.getSQLState(), key); schemaRegistry.invalidate(tableName); @@ -992,6 +1067,21 @@ private boolean handlePSQLExceptionForUpsert( throw new IOException(e); } + private boolean handlePSQLExceptionForUpsert( + PSQLException e, Key key, Document document, String tableName, boolean isRetry) + throws IOException { + if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) { + LOGGER.info( + "Schema mismatch detected during upsert (SQLState: {}), refreshing schema and retrying. key: {}", + e.getSQLState(), + key); + schemaRegistry.invalidate(tableName); + return upsertWithRetry(key, document, true); + } + LOGGER.error("SQLException in upsert. key: {} content: {}", key, document, e); + throw new IOException(e); + } + private CreateResult handlePSQLExceptionForCreate( PSQLException e, Key key, Document document, String tableName, boolean isRetry) throws IOException { diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java new file mode 100644 index 000000000..6ff30b64e --- /dev/null +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java @@ -0,0 +1,352 @@ +package org.hypertrace.core.documentstore.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import org.hypertrace.core.documentstore.Document; +import org.hypertrace.core.documentstore.JSONDocument; +import org.hypertrace.core.documentstore.Key; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.postgresql.util.PSQLException; +import org.postgresql.util.PSQLState; + +@ExtendWith(MockitoExtension.class) +class FlatPostgresCollectionTest { + + private static final String COLLECTION_NAME = "test_flat_collection"; + + @Mock private PostgresClient mockClient; + @Mock private PostgresLazyilyLoadedSchemaRegistry mockSchemaRegistry; + @Mock private Connection mockConnection; + @Mock private PreparedStatement mockPreparedStatement; + @Mock private ResultSet mockResultSet; + + private FlatPostgresCollection flatPostgresCollection; + + @BeforeEach + void setUp() { + when(mockClient.getCustomParameters()).thenReturn(Collections.emptyMap()); + flatPostgresCollection = + new FlatPostgresCollection(mockClient, COLLECTION_NAME, mockSchemaRegistry); + } + + private Map createBasicSchema() { + Map schema = new LinkedHashMap<>(); + schema.put( + "id", + PostgresColumnMetadata.builder() + .colName("id") + .canonicalType(DataType.STRING) + .postgresType(PostgresDataType.TEXT) + .nullable(false) + .isArray(false) + .isPrimaryKey(true) + .build()); + schema.put( + "item", + PostgresColumnMetadata.builder() + .colName("item") + .canonicalType(DataType.STRING) + .postgresType(PostgresDataType.TEXT) + .nullable(true) + .isArray(false) + .isPrimaryKey(false) + .build()); + schema.put( + "price", + PostgresColumnMetadata.builder() + .colName("price") + .canonicalType(DataType.LONG) + .postgresType(PostgresDataType.INTEGER) + .nullable(true) + .isArray(false) + .isPrimaryKey(false) + .build()); + return schema; + } + + private PSQLException createPSQLException(PSQLState state) { + return new PSQLException("Test error", state); + } + + private void setupCommonMocks(Map schema) throws SQLException { + // Mock getColumnOrRefresh for each field in the document + when(mockSchemaRegistry.getColumnOrRefresh(anyString(), anyString())) + .thenAnswer( + invocation -> { + String columnName = invocation.getArgument(1); + return Optional.ofNullable(schema.get(columnName)); + }); + // Mock getPrimaryKeyColumn + when(mockSchemaRegistry.getPrimaryKeyColumn(COLLECTION_NAME)).thenReturn(Optional.of("id")); + when(mockClient.getPooledConnection()).thenReturn(mockConnection); + when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement); + } + + private void setupCreateOrReplaceMocks(Map schema) + throws SQLException { + setupCommonMocks(schema); + when(mockSchemaRegistry.getSchema(COLLECTION_NAME)).thenReturn(schema); + } + + @Nested + @DisplayName("createOrReplace Exception Handling Tests") + class CreateOrReplaceExceptionTests { + + @Test + @DisplayName("Should retry on UNDEFINED_COLUMN PSQLException and succeed") + void testCreateOrReplaceRetriesOnUndefinedColumn() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCreateOrReplaceMocks(schema); + + // First call throws UNDEFINED_COLUMN, second call succeeds + PSQLException psqlException = createPSQLException(PSQLState.UNDEFINED_COLUMN); + when(mockPreparedStatement.executeQuery()).thenThrow(psqlException).thenReturn(mockResultSet); + when(mockResultSet.next()).thenReturn(true); + when(mockResultSet.getBoolean("is_insert")).thenReturn(true); + + doNothing().when(mockSchemaRegistry).invalidate(COLLECTION_NAME); + + boolean result = flatPostgresCollection.createOrReplace(key, document); + + assertTrue(result); + verify(mockSchemaRegistry, times(1)).invalidate(COLLECTION_NAME); + verify(mockPreparedStatement, times(2)).executeQuery(); + } + + @Test + @DisplayName("Should retry on DATATYPE_MISMATCH PSQLException and succeed") + void testCreateOrReplaceRetriesOnDatatypeMismatch() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCreateOrReplaceMocks(schema); + + // First call throws DATATYPE_MISMATCH, second call succeeds + PSQLException psqlException = createPSQLException(PSQLState.DATATYPE_MISMATCH); + when(mockPreparedStatement.executeQuery()).thenThrow(psqlException).thenReturn(mockResultSet); + when(mockResultSet.next()).thenReturn(true); + when(mockResultSet.getBoolean("is_insert")).thenReturn(false); + + doNothing().when(mockSchemaRegistry).invalidate(COLLECTION_NAME); + + boolean result = flatPostgresCollection.createOrReplace(key, document); + + assertFalse(result); + verify(mockSchemaRegistry, times(1)).invalidate(COLLECTION_NAME); + verify(mockPreparedStatement, times(2)).executeQuery(); + } + + @Test + @DisplayName("Should throw IOException on non-retryable PSQLException") + void testCreateOrReplaceThrowsOnNonRetryablePSQLException() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCreateOrReplaceMocks(schema); + + // Throw a non-retryable PSQLException (e.g., UNIQUE_VIOLATION) + PSQLException psqlException = createPSQLException(PSQLState.UNIQUE_VIOLATION); + when(mockPreparedStatement.executeQuery()).thenThrow(psqlException); + + IOException thrown = + assertThrows( + IOException.class, () -> flatPostgresCollection.createOrReplace(key, document)); + + assertEquals(psqlException, thrown.getCause()); + verify(mockSchemaRegistry, never()).invalidate(anyString()); + } + + @Test + @DisplayName("Should throw IOException on generic SQLException") + void testCreateOrReplaceThrowsOnSQLException() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCreateOrReplaceMocks(schema); + + // Throw a generic SQLException (not PSQLException) + SQLException sqlException = new SQLException("Connection lost"); + when(mockPreparedStatement.executeQuery()).thenThrow(sqlException); + + IOException thrown = + assertThrows( + IOException.class, () -> flatPostgresCollection.createOrReplace(key, document)); + + assertEquals(sqlException, thrown.getCause()); + verify(mockSchemaRegistry, never()).invalidate(anyString()); + } + + @Test + @DisplayName("Should throw IOException when retry also fails") + void testCreateOrReplaceThrowsWhenRetryFails() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCreateOrReplaceMocks(schema); + + // Both calls throw UNDEFINED_COLUMN - first triggers retry, second should throw + PSQLException psqlException = createPSQLException(PSQLState.UNDEFINED_COLUMN); + when(mockPreparedStatement.executeQuery()).thenThrow(psqlException); + + doNothing().when(mockSchemaRegistry).invalidate(COLLECTION_NAME); + + IOException thrown = + assertThrows( + IOException.class, () -> flatPostgresCollection.createOrReplace(key, document)); + + assertEquals(psqlException, thrown.getCause()); + verify(mockSchemaRegistry, times(1)).invalidate(COLLECTION_NAME); + verify(mockPreparedStatement, times(2)).executeQuery(); + } + } + + @Nested + @DisplayName("upsert Exception Handling Tests") + class UpsertExceptionTests { + + @Test + @DisplayName("Should retry on UNDEFINED_COLUMN PSQLException and succeed") + void testUpsertRetriesOnUndefinedColumn() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCommonMocks(schema); + + // First call throws UNDEFINED_COLUMN, second call succeeds + PSQLException psqlException = createPSQLException(PSQLState.UNDEFINED_COLUMN); + when(mockPreparedStatement.executeQuery()).thenThrow(psqlException).thenReturn(mockResultSet); + when(mockResultSet.next()).thenReturn(true); + when(mockResultSet.getBoolean("is_insert")).thenReturn(true); + + doNothing().when(mockSchemaRegistry).invalidate(COLLECTION_NAME); + + boolean result = flatPostgresCollection.upsert(key, document); + + assertTrue(result); + verify(mockSchemaRegistry, times(1)).invalidate(COLLECTION_NAME); + verify(mockPreparedStatement, times(2)).executeQuery(); + } + + @Test + @DisplayName("Should retry on DATATYPE_MISMATCH PSQLException and succeed") + void testUpsertRetriesOnDatatypeMismatch() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCommonMocks(schema); + + // First call throws DATATYPE_MISMATCH, second call succeeds + PSQLException psqlException = createPSQLException(PSQLState.DATATYPE_MISMATCH); + when(mockPreparedStatement.executeQuery()).thenThrow(psqlException).thenReturn(mockResultSet); + when(mockResultSet.next()).thenReturn(true); + when(mockResultSet.getBoolean("is_insert")).thenReturn(false); + + doNothing().when(mockSchemaRegistry).invalidate(COLLECTION_NAME); + + boolean result = flatPostgresCollection.upsert(key, document); + + assertFalse(result); + verify(mockSchemaRegistry, times(1)).invalidate(COLLECTION_NAME); + verify(mockPreparedStatement, times(2)).executeQuery(); + } + + @Test + @DisplayName("Should throw IOException on non-retryable PSQLException") + void testUpsertThrowsOnNonRetryablePSQLException() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCommonMocks(schema); + + // Throw a non-retryable PSQLException + PSQLException psqlException = createPSQLException(PSQLState.UNIQUE_VIOLATION); + when(mockPreparedStatement.executeQuery()).thenThrow(psqlException); + + IOException thrown = + assertThrows(IOException.class, () -> flatPostgresCollection.upsert(key, document)); + + assertEquals(psqlException, thrown.getCause()); + verify(mockSchemaRegistry, never()).invalidate(anyString()); + } + + @Test + @DisplayName("Should throw IOException on generic SQLException") + void testUpsertThrowsOnSQLException() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCommonMocks(schema); + + // Throw a generic SQLException (not PSQLException) + SQLException sqlException = new SQLException("Connection lost"); + when(mockPreparedStatement.executeQuery()).thenThrow(sqlException); + + IOException thrown = + assertThrows(IOException.class, () -> flatPostgresCollection.upsert(key, document)); + + assertEquals(sqlException, thrown.getCause()); + verify(mockSchemaRegistry, never()).invalidate(anyString()); + } + + @Test + @DisplayName("Should throw IOException when retry also fails") + void testUpsertThrowsWhenRetryFails() throws Exception { + Key key = Key.from("test-key"); + Document document = new JSONDocument("{\"item\": \"Test\", \"price\": 100}"); + + Map schema = createBasicSchema(); + setupCommonMocks(schema); + + // Both calls throw UNDEFINED_COLUMN - first triggers retry, second should throw + PSQLException psqlException = createPSQLException(PSQLState.UNDEFINED_COLUMN); + when(mockPreparedStatement.executeQuery()).thenThrow(psqlException); + + doNothing().when(mockSchemaRegistry).invalidate(COLLECTION_NAME); + + IOException thrown = + assertThrows(IOException.class, () -> flatPostgresCollection.upsert(key, document)); + + assertEquals(psqlException, thrown.getCause()); + verify(mockSchemaRegistry, times(1)).invalidate(COLLECTION_NAME); + verify(mockPreparedStatement, times(2)).executeQuery(); + } + } +}