Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,154 @@ public static void shutdown() {
class UpsertTests {

@Test
@DisplayName("Should throw UnsupportedOperationException for upsert")
void testUpsertNewDocument() {
@DisplayName("Should create new document when key doesn't exist and return true")
void testUpsertNewDocument() throws Exception {
String docId = getRandomDocId(4);

ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
objectNode.put("_id", 100);
objectNode.put("id", docId);
objectNode.put("item", "NewItem");
objectNode.put("price", 99);
Document document = new JSONDocument(objectNode);
Key key = new SingleValueKey("default", "100");
Key key = new SingleValueKey(DEFAULT_TENANT, docId);

boolean isNew = flatCollection.upsert(key, document);

assertTrue(isNew, "Should return true for new document");

queryAndAssert(
key,
rs -> {
assertTrue(rs.next());
assertEquals("NewItem", rs.getString("item"));
assertEquals(99, rs.getInt("price"));
});
}

@Test
@DisplayName("Should merge with existing document preserving unspecified fields")
void testUpsertMergesWithExistingDocument() throws Exception {
String docId = getRandomDocId(4);

// First, create a document with multiple fields
ObjectNode initialNode = OBJECT_MAPPER.createObjectNode();
initialNode.put("id", docId);
initialNode.put("item", "Original Item");
initialNode.put("price", 100);
initialNode.put("quantity", 50);
initialNode.put("in_stock", true);
Document initialDoc = new JSONDocument(initialNode);
Key key = new SingleValueKey(DEFAULT_TENANT, docId);

boolean firstResult = flatCollection.upsert(key, initialDoc);
assertTrue(firstResult, "First upsert should create new document");

// Now upsert with only some fields - others should be PRESERVED (merge behavior)
ObjectNode mergeNode = OBJECT_MAPPER.createObjectNode();
mergeNode.put("id", docId);
mergeNode.put("item", "Updated Item");
// price and quantity are NOT specified - they should retain their original values
Document mergeDoc = new JSONDocument(mergeNode);

boolean secondResult = flatCollection.upsert(key, mergeDoc);
assertFalse(secondResult, "Second upsert should update existing document");

// Verify merge behavior: item updated, price/quantity/in_stock preserved
queryAndAssert(
key,
rs -> {
assertTrue(rs.next());
assertEquals("Updated Item", rs.getString("item"));
// These should be PRESERVED from the original document (merge semantics)
assertEquals(100, rs.getInt("price"));
assertEquals(50, rs.getInt("quantity"));
assertTrue(rs.getBoolean("in_stock"));
});
}

@Test
@DisplayName("Upsert vs CreateOrReplace: upsert preserves, createOrReplace resets to default")
void testUpsertVsCreateOrReplaceBehavior() throws Exception {
String docId1 = getRandomDocId(4);
String docId2 = getRandomDocId(4);

assertThrows(UnsupportedOperationException.class, () -> flatCollection.upsert(key, document));
// Setup: Create two identical documents
ObjectNode initialNode = OBJECT_MAPPER.createObjectNode();
initialNode.put("item", "Original Item");
initialNode.put("price", 100);
initialNode.put("quantity", 50);

ObjectNode doc1 = initialNode.deepCopy();
doc1.put("id", docId1);
ObjectNode doc2 = initialNode.deepCopy();
doc2.put("id", docId2);

Key key1 = new SingleValueKey(DEFAULT_TENANT, docId1);
Key key2 = new SingleValueKey(DEFAULT_TENANT, docId2);

flatCollection.upsert(key1, new JSONDocument(doc1));
flatCollection.upsert(key2, new JSONDocument(doc2));

// Now update both with partial documents (only item field)
ObjectNode partialUpdate = OBJECT_MAPPER.createObjectNode();
partialUpdate.put("item", "Updated Item");

ObjectNode partial1 = partialUpdate.deepCopy();
partial1.put("id", docId1);
ObjectNode partial2 = partialUpdate.deepCopy();
partial2.put("id", docId2);

// Use upsert for doc1 - should PRESERVE price and quantity
flatCollection.upsert(key1, new JSONDocument(partial1));

// Use createOrReplace for doc2 - should RESET price and quantity to NULL (default)
flatCollection.createOrReplace(key2, new JSONDocument(partial2));

// Verify upsert preserved original values
queryAndAssert(
key1,
rs -> {
assertTrue(rs.next());
assertEquals("Updated Item", rs.getString("item"));
assertEquals(100, rs.getInt("price")); // PRESERVED
assertEquals(50, rs.getInt("quantity")); // PRESERVED
});

// Verify createOrReplace reset to defaults
queryAndAssert(
key2,
rs -> {
assertTrue(rs.next());
assertEquals("Updated Item", rs.getString("item"));
assertNull(rs.getObject("price")); // RESET to NULL
assertNull(rs.getObject("quantity")); // RESET to NULL
});
}

@Test
@DisplayName("Should skip unknown fields in upsert (default SKIP strategy)")
void testUpsertSkipsUnknownFields() throws Exception {
String docId = getRandomDocId(4);

ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
objectNode.put("id", docId);
objectNode.put("item", "Item with unknown");
objectNode.put("price", 200);
objectNode.put("unknown_field", "should be skipped");
Document document = new JSONDocument(objectNode);
Key key = new SingleValueKey(DEFAULT_TENANT, docId);

boolean isNew = flatCollection.upsert(key, document);
assertTrue(isNew);

// Verify only known fields were inserted
queryAndAssert(
key,
rs -> {
assertTrue(rs.next());
assertEquals("Item with unknown", rs.getString("item"));
assertEquals(200, rs.getInt("price"));
});
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private PostgresQueryParser createParser(Query query) {

@Override
public boolean upsert(Key key, Document document) throws IOException {
throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED);
return upsertWithRetry(key, document, false);
}

@Override
Expand Down Expand Up @@ -325,7 +325,7 @@ public boolean bulkUpsert(Map<Key, Document> documents) {

// Build the bulk upsert SQL with all columns
List<String> columnList = new ArrayList<>(allColumns);
String sql = buildBulkUpsertSql(columnList, quotedPkColumn);
String sql = buildMergeUpsertSql(columnList, quotedPkColumn, false);
LOGGER.debug("Bulk upsert SQL: {}", sql);

try (Connection conn = client.getPooledConnection();
Expand Down Expand Up @@ -374,27 +374,34 @@ public boolean bulkUpsert(Map<Key, Document> documents) {
}

/**
* Builds a PostgreSQL bulk upsert SQL statement for batch execution.
* Builds a PostgreSQL upsert SQL statement with merge semantics.
*
* <p>Generates: INSERT ... ON CONFLICT DO UPDATE SET col = EXCLUDED.col for each column. Only
* columns in the provided list are updated on conflict (merge behavior).
*
* @param columns List of quoted column names (PK should be first)
* @param columns List of quoted column names to include
* @param pkColumn The quoted primary key column name
* @param includeReturning If true, adds RETURNING clause to detect insert vs update
* @return The upsert SQL statement
*/
private String buildBulkUpsertSql(List<String> columns, String pkColumn) {
private String buildMergeUpsertSql(
List<String> columns, String pkColumn, boolean includeReturning) {
String columnList = String.join(", ", columns);
String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new));

// Build SET clause for non-PK columns: col = EXCLUDED.col (this ensures that on conflict, the
// new value is picked)
// Build SET clause for non-PK columns: col = EXCLUDED.col
String setClause =
columns.stream()
.filter(col -> !col.equals(pkColumn))
.map(col -> col + " = EXCLUDED." + col)
.collect(Collectors.joining(", "));

return String.format(
"INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s",
tableIdentifier, columnList, placeholders, pkColumn, setClause);
String sql =
String.format(
"INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s",
tableIdentifier, columnList, placeholders, pkColumn, setClause);

return includeReturning ? sql + " RETURNING (xmax = 0) AS is_insert" : sql;
}

@Override
Expand Down Expand Up @@ -880,13 +887,81 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR
return executeUpsert(sql, parsed);

} catch (PSQLException e) {
return handlePSQLExceptionForUpsert(e, key, document, tableName, isRetry);
return handlePSQLExceptionForCreateOrReplace(e, key, document, tableName, isRetry);
} catch (SQLException e) {
LOGGER.error("SQLException in createOrReplace. key: {} content: {}", key, document, e);
throw new IOException(e);
}
}

/**
* Upserts a document with merge semantics - only updates columns present in the document,
* preserving existing values for columns not in the document.
*
* <p>Unlike {@link #createOrReplaceWithRetry}, this method does NOT reset missing columns to
* their default values.
*
* @param key The document key
* @param document The document to upsert
* @param isRetry Whether this is a retry attempt after schema refresh
* @return true if a new document was created, false if an existing document was updated
*/
private boolean upsertWithRetry(Key key, Document document, boolean isRetry) throws IOException {
String tableName = tableIdentifier.getTableName();
List<String> skippedFields = new ArrayList<>();

try {
TypedDocument parsed = parseDocument(document, tableName, skippedFields);

// Add the key as the primary key column
String pkColumn = getPKForTable(tableName);
String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn);
PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn);
parsed.add(quotedPkColumn, key.toString(), pkType, false);

List<String> docColumns = parsed.getColumns();

String sql = buildUpsertSql(docColumns, quotedPkColumn);
LOGGER.debug("Upsert (merge) SQL: {}", sql);

return executeUpsert(sql, parsed);

} catch (PSQLException e) {
return handlePSQLExceptionForUpsert(e, key, document, tableName, isRetry);
} catch (SQLException e) {
LOGGER.error("SQLException in upsert. key: {} content: {}", key, document, e);
throw new IOException(e);
}
}

/**
* Builds a PostgreSQL upsert SQL statement with merge semantics.
*
* <p>This method constructs an atomic upsert query that:
*
* <ul>
* <li>Inserts a new row if no conflict on the primary key
* <li>If the row with that PK already exists, only updates columns present in the document
* <li>Columns NOT in the document retain their existing values (merge behavior)
* </ul>
*
* <p><b>Generated SQL pattern:</b>
*
* <pre>{@code
* INSERT INTO table (col1, col2, pk_col)
* VALUES (?, ?, ?)
* ON CONFLICT (pk_col) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2
* RETURNING (xmax = 0) AS is_insert
* }</pre>
*
* @param docColumns columns present in the document
* @param pkColumn The quoted primary key column name used for conflict detection
* @return The complete upsert SQL statement with placeholders for values
*/
private String buildUpsertSql(List<String> docColumns, String pkColumn) {
return buildMergeUpsertSql(docColumns, pkColumn, true);
}

/**
* Builds a PostgreSQL upsert (INSERT ... ON CONFLICT DO UPDATE) SQL statement.
*
Expand Down Expand Up @@ -977,12 +1052,12 @@ private boolean executeUpsert(String sql, TypedDocument parsed) throws SQLExcept
}
}

private boolean handlePSQLExceptionForUpsert(
private boolean handlePSQLExceptionForCreateOrReplace(
PSQLException e, Key key, Document document, String tableName, boolean isRetry)
throws IOException {
if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) {
LOGGER.info(
"Schema mismatch detected during upsert (SQLState: {}), refreshing schema and retrying. key: {}",
"Schema mismatch detected during createOrReplace (SQLState: {}), refreshing schema and retrying. key: {}",
e.getSQLState(),
key);
schemaRegistry.invalidate(tableName);
Expand All @@ -992,6 +1067,21 @@ private boolean handlePSQLExceptionForUpsert(
throw new IOException(e);
}

private boolean handlePSQLExceptionForUpsert(
PSQLException e, Key key, Document document, String tableName, boolean isRetry)
throws IOException {
if (!isRetry && shouldRefreshSchemaAndRetry(e.getSQLState())) {
LOGGER.info(
"Schema mismatch detected during upsert (SQLState: {}), refreshing schema and retrying. key: {}",
e.getSQLState(),
key);
schemaRegistry.invalidate(tableName);
return upsertWithRetry(key, document, true);
}
LOGGER.error("SQLException in upsert. key: {} content: {}", key, document, e);
throw new IOException(e);
}

private CreateResult handlePSQLExceptionForCreate(
PSQLException e, Key key, Document document, String tableName, boolean isRetry)
throws IOException {
Expand Down
Loading
Loading