From d16beac5f309328274bd845b20de9fdb206b44a4 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Sun, 22 Feb 2026 21:27:50 +0530 Subject: [PATCH 1/7] WIP --- .../postgres/FlatPostgresCollection.java | 66 ++++++++++++++++--- 1 file changed, 57 insertions(+), 9 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 816b85b1a..10805d8f6 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -76,9 +76,9 @@ public class FlatPostgresCollection extends PostgresCollection { private static final Map SUB_DOC_UPDATE_PARSERS = - Map.of( - SET, new FlatCollectionSubDocSetOperatorParser(), - ADD, new FlatCollectionSubDocAddOperatorParser()); + Map.of( + SET, new FlatCollectionSubDocSetOperatorParser(), + ADD, new FlatCollectionSubDocAddOperatorParser()); private final PostgresLazyilyLoadedSchemaRegistry schemaRegistry; @@ -376,7 +376,7 @@ public boolean bulkUpsert(Map documents) { /** * Builds a PostgreSQL bulk upsert SQL statement for batch execution. * - * @param columns List of quoted column names (PK should be first) + * @param columns List of quoted column names (PK should be first) * @param pkColumn The quoted primary key column name * @return The upsert SQL statement */ @@ -548,7 +548,7 @@ public Optional update( * Validates all updates and resolves column names. * * @return Map of path -> columnName for all resolved columns. For example: customAttributes.props - * -> customAttributes (since customAttributes is the top-level JSONB col) + * -> customAttributes (since customAttributes is the top-level JSONB col) */ private Map resolvePathsToColumns( Collection updates, String tableName) { @@ -621,7 +621,9 @@ private Optional resolveColumnName(String path, String tableName) { return Optional.empty(); } - /** Extracts the nested JSONB path from a full path given the resolved column name. */ + /** + * Extracts the nested JSONB path from a full path given the resolved column name. + */ private String[] getNestedPath(String fullPath, String columnName) { if (fullPath.equals(columnName)) { return new String[0]; @@ -732,7 +734,53 @@ public CloseableIterator bulkUpdate( java.util.Collection updates, UpdateOptions updateOptions) throws IOException { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); + + Preconditions.checkArgument( + updateOptions != null && !updates.isEmpty(), "Updates collection cannot be NULL or empty"); + + String tableName = tableIdentifier.getTableName(); + CloseableIterator beforeIterator = null; + + try { + ReturnDocumentType returnType = updateOptions.getReturnDocumentType(); + + Map resolvedColumns = resolvePathsToColumns(updates, tableName); + + if (returnType == BEFORE_UPDATE) { + beforeIterator = find(query); + } + + try (Connection connection = client.getPooledConnection()) { + executeUpdate(connection, query, updates, tableName, resolvedColumns); + } + + switch (returnType) { + case AFTER_UPDATE: + return find(query); + + case BEFORE_UPDATE: + return beforeIterator; + + case NONE: + return CloseableIterator.emptyIterator(); + + default: + throw new UnsupportedOperationException( + "Unsupported return document type: " + returnType); + } + + } catch (SQLException e) { + if (beforeIterator != null) { + beforeIterator.close(); + } + LOGGER.error("SQLException during bulkUpdate operation", e); + throw new IOException(e); + } catch (Exception e) { + if (beforeIterator != null) { + beforeIterator.close(); + } + throw new IOException(e); + } } /*isRetry: Whether this is a retry attempt*/ @@ -923,8 +971,8 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR * * * @param allTableColumns all cols present in the table - * @param docColumns cols present in the document - * @param pkColumn The quoted primary key column name used for conflict detection + * @param docColumns cols present in the document + * @param pkColumn The quoted primary key column name used for conflict detection * @return The complete upsert SQL statement with placeholders for values */ private String buildCreateOrReplaceSql( From a5623884b48a9d89f953d9c5a9b5d116dd98c4c9 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 25 Feb 2026 11:49:03 +0530 Subject: [PATCH 2/7] Implement bulkUpdate --- .../FlatCollectionWriteTest.java | 333 +++++++++++++++++- .../postgres/FlatPostgresCollection.java | 130 +++---- 2 files changed, 394 insertions(+), 69 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index bbfa508e3..279e151a2 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -2584,10 +2584,16 @@ void testUpdateUnsupportedOperator() { assertThrows( IllegalArgumentException.class, () -> flatCollection.update(query, updates, options)); } + } + + @Nested + @DisplayName("Bulk Update Operations") + class BulkUpdateTests { @Test - @DisplayName("Should throw UnsupportedOperationException for bulkUpdate") - void testBulkUpdate() { + @DisplayName("Should update multiple rows and return AFTER_UPDATE documents") + void testBulkUpdateWithAfterUpdateReturn() throws Exception { + // Filter: price > 5 should match multiple rows (IDs 1, 2, 3, 5, 6, 7, 8) Query query = Query.builder() .setFilter( @@ -2597,14 +2603,331 @@ void testBulkUpdate() { ConstantExpression.of(5))) .build(); - List updates = List.of(SubDocumentUpdate.of("price", 100)); + List updates = List.of(SubDocumentUpdate.of("quantity", 999)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + List results = new ArrayList<>(); + while (resultIterator.hasNext()) { + results.add(resultIterator.next()); + } + resultIterator.close(); + + assertTrue(results.size() > 1, "Should return multiple updated documents"); + + for (Document doc : results) { + JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); + assertEquals(999, json.get("quantity").asInt(), "All docs should have updated quantity"); + } + + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT COUNT(*) FROM \"%s\" WHERE \"quantity\" = 999", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(results.size(), rs.getInt(1), "DB should have same number of updated rows"); + } + } + + @Test + @DisplayName("Should update multiple rows and return BEFORE_UPDATE documents") + void testBulkUpdateWithBeforeUpdateReturn() throws Exception { + // First, get the original quantities for verification + Map originalQuantities = new HashMap<>(); + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"id\", \"quantity\" FROM \"%s\" WHERE \"price\" > 10", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + originalQuantities.put(rs.getString("id"), rs.getInt("quantity")); + } + } + + // Filter: price > 10 should match a subset of rows + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("price"), + RelationalOperator.GT, + ConstantExpression.of(10))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("quantity", 888)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.BEFORE_UPDATE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + List results = new ArrayList<>(); + while (resultIterator.hasNext()) { + results.add(resultIterator.next()); + } + resultIterator.close(); + + // Verify the returned documents have the ORIGINAL quantities (before update) + for (Document doc : results) { + JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); + String id = json.get("id").asText(); + int returnedQuantity = json.get("quantity").asInt(); + + assertTrue(originalQuantities.containsKey(id), "Returned doc ID should be in original set"); + assertEquals( + originalQuantities.get(id).intValue(), + returnedQuantity, + "Returned quantity should be the ORIGINAL value"); + } + + // But database should have the NEW value + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"quantity\" FROM \"%s\" WHERE \"price\" > 10", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + assertEquals(888, rs.getInt("quantity"), "DB should have the updated value"); + } + } + } + + @Test + @DisplayName("Should return empty iterator when ReturnDocumentType is NONE") + void testBulkUpdateWithNoneReturn() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 123)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.NONE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + // Should return empty iterator + assertFalse(resultIterator.hasNext(), "Should return empty iterator for NONE return type"); + resultIterator.close(); + + // But database should still be updated + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"price\" FROM \"%s\" WHERE \"id\" = '1'", FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(123, rs.getInt("price")); + } + } + + @Test + @DisplayName("Should return empty iterator when filter matches no documents") + void testBulkUpdateNoMatchingDocuments() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("non-existent-id"))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 999)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + assertFalse(resultIterator.hasNext(), "Should return empty iterator when no docs match"); + resultIterator.close(); + } + + @Test + @DisplayName("Should update with multiple SubDocumentUpdates") + void testBulkUpdateMultipleFields() throws Exception { + // Update item = "Soap" (IDs 1, 5, 8) + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("item"), + RelationalOperator.EQ, + ConstantExpression.of("Soap"))) + .build(); + + // Update both price and quantity + List updates = + List.of(SubDocumentUpdate.of("price", 50), SubDocumentUpdate.of("quantity", 200)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + List results = new ArrayList<>(); + while (resultIterator.hasNext()) { + results.add(resultIterator.next()); + } + resultIterator.close(); + + assertEquals(3, results.size(), "Should return 3 Soap items"); + + for (Document doc : results) { + JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); + assertEquals("Soap", json.get("item").asText()); + assertEquals(50, json.get("price").asInt()); + assertEquals(200, json.get("quantity").asInt()); + } + } + + @Test + @DisplayName("Should update nested JSONB paths for multiple documents") + void testBulkUpdateNestedJsonbPath() throws Exception { + // Documents with props JSONB: IDs 1, 3, 5, 7 + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("item"), + RelationalOperator.EQ, + ConstantExpression.of("Soap"))) + .build(); + + List updates = + List.of(SubDocumentUpdate.of("props.brand", "BulkUpdatedBrand")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + List results = new ArrayList<>(); + while (resultIterator.hasNext()) { + results.add(resultIterator.next()); + } + resultIterator.close(); + + // Verify all returned documents have updated props.brand + for (Document doc : results) { + JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); + JsonNode props = json.get("props"); + if (props != null && !props.isNull()) { + assertEquals( + "BulkUpdatedBrand", props.get("brand").asText(), "props.brand should be updated"); + } + } + } + + @Test + @DisplayName("Should throw IllegalArgumentException for empty updates") + void testBulkUpdateEmptyUpdates() { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + List emptyUpdates = Collections.emptyList(); UpdateOptions options = UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); assertThrows( - UnsupportedOperationException.class, - () -> flatCollection.bulkUpdate(query, updates, options)); + IllegalArgumentException.class, + () -> flatCollection.bulkUpdate(query, emptyUpdates, options)); + } + + @Test + @DisplayName("Should skip non-existent column with default SKIP strategy") + void testBulkUpdateNonExistentColumnWithSkipStrategy() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + // Mix of valid and invalid (non-existent) column paths + List updates = + List.of( + SubDocumentUpdate.of("price", 111), + SubDocumentUpdate.of("nonExistentColumn", "someValue")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + // Default strategy is SKIP - should not throw, just skip the non-existent column + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + List results = new ArrayList<>(); + while (resultIterator.hasNext()) { + results.add(resultIterator.next()); + } + resultIterator.close(); + + assertEquals(1, results.size()); + JsonNode json = OBJECT_MAPPER.readTree(results.get(0).toJson()); + assertEquals(111, json.get("price").asInt(), "Valid column should be updated"); + assertFalse(json.has("nonExistentColumn"), "Non-existent column should not appear"); + } + + @Test + @DisplayName("Should throw exception for non-existent column with THROW strategy") + void testBulkUpdateNonExistentColumnWithThrowStrategy() { + Collection collectionWithThrowStrategy = + getFlatCollectionWithStrategy(MissingColumnStrategy.THROW.toString()); + + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + List updates = + List.of(SubDocumentUpdate.of("nonExistentColumn", "someValue")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows( + IllegalArgumentException.class, + () -> collectionWithThrowStrategy.bulkUpdate(query, updates, options)); } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 71c7c60dc..cf4835a57 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -76,9 +76,9 @@ public class FlatPostgresCollection extends PostgresCollection { private static final Map SUB_DOC_UPDATE_PARSERS = - Map.of( - SET, new FlatCollectionSubDocSetOperatorParser(), - ADD, new FlatCollectionSubDocAddOperatorParser()); + Map.of( + SET, new FlatCollectionSubDocSetOperatorParser(), + ADD, new FlatCollectionSubDocAddOperatorParser()); private final PostgresLazyilyLoadedSchemaRegistry schemaRegistry; @@ -551,11 +551,70 @@ public Optional update( } } + @Override + public CloseableIterator bulkUpdate( + org.hypertrace.core.documentstore.query.Query query, + Collection updates, + UpdateOptions updateOptions) + throws IOException { + + Preconditions.checkArgument( + updateOptions != null && !updates.isEmpty(), "Updates cannot be NULL or empty"); + + String tableName = tableIdentifier.getTableName(); + CloseableIterator beforeIterator = null; + + try { + ReturnDocumentType returnType = updateOptions.getReturnDocumentType(); + + Map resolvedColumns = resolvePathsToColumns(updates, tableName); + + if (returnType == BEFORE_UPDATE) { + beforeIterator = find(query); + } + + try (Connection connection = client.getPooledConnection()) { + executeUpdate(connection, query, updates, tableName, resolvedColumns); + } + + switch (returnType) { + case AFTER_UPDATE: + return find(query); + + case BEFORE_UPDATE: + return beforeIterator; + + case NONE: + return CloseableIterator.emptyIterator(); + + default: + throw new UnsupportedOperationException( + "Unsupported return document type: " + returnType); + } + + } catch (SQLException e) { + if (beforeIterator != null) { + beforeIterator.close(); + } + LOGGER.error( + "SQLException during bulkUpdate operation. SQLState: {}, ErrorCode: {}", + e.getSQLState(), + e.getErrorCode(), + e); + throw new IOException(e); + } catch (Exception e) { + if (beforeIterator != null) { + beforeIterator.close(); + } + throw new IOException(e); + } + } + /** * Validates all updates and resolves column names. * * @return Map of path -> columnName for all resolved columns. For example: customAttributes.props - * -> customAttributes (since customAttributes is the top-level JSONB col) + * -> customAttributes (since customAttributes is the top-level JSONB col) */ private Map resolvePathsToColumns( Collection updates, String tableName) { @@ -628,9 +687,7 @@ private Optional resolveColumnName(String path, String tableName) { return Optional.empty(); } - /** - * Extracts the nested JSONB path from a full path given the resolved column name. - */ + /** Extracts the nested JSONB path from a full path given the resolved column name. */ private String[] getNestedPath(String fullPath, String columnName) { if (fullPath.equals(columnName)) { return new String[0]; @@ -735,61 +792,6 @@ private void executeUpdate( } } - @Override - public CloseableIterator bulkUpdate( - org.hypertrace.core.documentstore.query.Query query, - java.util.Collection updates, - UpdateOptions updateOptions) - throws IOException { - - Preconditions.checkArgument( - updateOptions != null && !updates.isEmpty(), "Updates collection cannot be NULL or empty"); - - String tableName = tableIdentifier.getTableName(); - CloseableIterator beforeIterator = null; - - try { - ReturnDocumentType returnType = updateOptions.getReturnDocumentType(); - - Map resolvedColumns = resolvePathsToColumns(updates, tableName); - - if (returnType == BEFORE_UPDATE) { - beforeIterator = find(query); - } - - try (Connection connection = client.getPooledConnection()) { - executeUpdate(connection, query, updates, tableName, resolvedColumns); - } - - switch (returnType) { - case AFTER_UPDATE: - return find(query); - - case BEFORE_UPDATE: - return beforeIterator; - - case NONE: - return CloseableIterator.emptyIterator(); - - default: - throw new UnsupportedOperationException( - "Unsupported return document type: " + returnType); - } - - } catch (SQLException e) { - if (beforeIterator != null) { - beforeIterator.close(); - } - LOGGER.error("SQLException during bulkUpdate operation", e); - throw new IOException(e); - } catch (Exception e) { - if (beforeIterator != null) { - beforeIterator.close(); - } - throw new IOException(e); - } - } - /*isRetry: Whether this is a retry attempt*/ private CreateResult createWithRetry(Key key, Document document, boolean isRetry) throws IOException { @@ -1046,8 +1048,8 @@ private String buildUpsertSql(List docColumns, String pkColumn) { * * * @param allTableColumns all cols present in the table - * @param docColumns cols present in the document - * @param pkColumn The quoted primary key column name used for conflict detection + * @param docColumns cols present in the document + * @param pkColumn The quoted primary key column name used for conflict detection * @return The complete upsert SQL statement with placeholders for values */ private String buildCreateOrReplaceSql( From ff91a929071f81f1590b771a49c58bd5705b4d1c Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 25 Feb 2026 11:53:52 +0530 Subject: [PATCH 3/7] Fix failing test case --- .../hypertrace/core/documentstore/FlatCollectionWriteTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 279e151a2..138229ee3 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -2926,8 +2926,7 @@ void testBulkUpdateNonExistentColumnWithThrowStrategy() { UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); assertThrows( - IllegalArgumentException.class, - () -> collectionWithThrowStrategy.bulkUpdate(query, updates, options)); + IOException.class, () -> collectionWithThrowStrategy.bulkUpdate(query, updates, options)); } } From 5a6e53b485effe8eb80bfdde209851e8a18d15b5 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 25 Feb 2026 12:02:19 +0530 Subject: [PATCH 4/7] Added more coverage --- .../postgres/FlatPostgresCollectionTest.java | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java index 6ff30b64e..c0fd4c04a 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java @@ -18,14 +18,19 @@ import java.sql.SQLException; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.JSONDocument; import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.model.options.ReturnDocumentType; +import org.hypertrace.core.documentstore.model.options.UpdateOptions; +import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; +import org.hypertrace.core.documentstore.query.Query; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -349,4 +354,58 @@ void testUpsertThrowsWhenRetryFails() throws Exception { verify(mockPreparedStatement, times(2)).executeQuery(); } } + + @Nested + @DisplayName("bulkUpdate Tests") + class BulkUpdateTests { + + @Test + @DisplayName("Should throw IllegalArgumentException for null options") + void testBulkUpdateThrowsOnNullOptions() { + Query query = Query.builder().build(); + List updates = List.of(SubDocumentUpdate.of("price", 100)); + + assertThrows( + IOException.class, () -> flatPostgresCollection.bulkUpdate(query, updates, null)); + } + + @Test + @DisplayName("Should throw IllegalArgumentException for empty updates") + void testBulkUpdateThrowsOnEmptyUpdates() { + Query query = Query.builder().build(); + List emptyUpdates = Collections.emptyList(); + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows( + IllegalArgumentException.class, + () -> flatPostgresCollection.bulkUpdate(query, emptyUpdates, options)); + } + + @Test + @DisplayName("Should throw IllegalArgumentException for unsupported operator") + void testBulkUpdateThrowsOnUnsupportedOperator() { + Query query = Query.builder().build(); + // UNSET is not supported + List updates = + List.of( + SubDocumentUpdate.builder() + .subDocument("price") + .operator(org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.UNSET) + .build()); + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + Map schema = createBasicSchema(); + when(mockSchemaRegistry.getColumnOrRefresh(anyString(), anyString())) + .thenAnswer( + invocation -> { + String columnName = invocation.getArgument(1); + return Optional.ofNullable(schema.get(columnName)); + }); + + assertThrows( + IOException.class, () -> flatPostgresCollection.bulkUpdate(query, updates, options)); + } + } } From dedef6f60b17e1ec24945d017468c3eabb1718c0 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 25 Feb 2026 12:05:33 +0530 Subject: [PATCH 5/7] Fix failing tests --- .../postgres/FlatPostgresCollectionTest.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java index c0fd4c04a..ce7be4782 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java @@ -366,7 +366,8 @@ void testBulkUpdateThrowsOnNullOptions() { List updates = List.of(SubDocumentUpdate.of("price", 100)); assertThrows( - IOException.class, () -> flatPostgresCollection.bulkUpdate(query, updates, null)); + IllegalArgumentException.class, + () -> flatPostgresCollection.bulkUpdate(query, updates, null)); } @Test @@ -383,7 +384,7 @@ void testBulkUpdateThrowsOnEmptyUpdates() { } @Test - @DisplayName("Should throw IllegalArgumentException for unsupported operator") + @DisplayName("Should throw IOException for unsupported operator") void testBulkUpdateThrowsOnUnsupportedOperator() { Query query = Query.builder().build(); // UNSET is not supported @@ -396,14 +397,7 @@ void testBulkUpdateThrowsOnUnsupportedOperator() { UpdateOptions options = UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); - Map schema = createBasicSchema(); - when(mockSchemaRegistry.getColumnOrRefresh(anyString(), anyString())) - .thenAnswer( - invocation -> { - String columnName = invocation.getArgument(1); - return Optional.ofNullable(schema.get(columnName)); - }); - + // No stubbing needed - operator check happens before schema lookup assertThrows( IOException.class, () -> flatPostgresCollection.bulkUpdate(query, updates, options)); } From a4cd49a025015fece040e16294bce70766a67676 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 25 Feb 2026 12:09:52 +0530 Subject: [PATCH 6/7] Fix failing tests --- .../postgres/FlatPostgresCollectionTest.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java index ce7be4782..c211f27c4 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java @@ -401,5 +401,31 @@ void testBulkUpdateThrowsOnUnsupportedOperator() { assertThrows( IOException.class, () -> flatPostgresCollection.bulkUpdate(query, updates, options)); } + + @Test + @DisplayName("Should throw IOException on SQLException and log SQLState and ErrorCode") + void testBulkUpdateThrowsOnSQLException() throws Exception { + Query query = Query.builder().build(); + List updates = List.of(SubDocumentUpdate.of("price", 100)); + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + Map schema = createBasicSchema(); + when(mockSchemaRegistry.getColumnOrRefresh(anyString(), anyString())) + .thenAnswer( + invocation -> { + String columnName = invocation.getArgument(1); + return Optional.ofNullable(schema.get(columnName)); + }); + + SQLException sqlException = new SQLException("Connection failed", "08001", 1001); + when(mockClient.getPooledConnection()).thenThrow(sqlException); + + IOException thrown = + assertThrows( + IOException.class, () -> flatPostgresCollection.bulkUpdate(query, updates, options)); + + assertEquals(sqlException, thrown.getCause()); + } } } From a9ce511ab92b4d0b48fb06d2cbc767d805dfae66 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 25 Feb 2026 12:24:53 +0530 Subject: [PATCH 7/7] Added tests for more coverage --- .../postgres/FlatPostgresCollectionTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java index c211f27c4..fcd941124 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java @@ -4,9 +4,13 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -21,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import org.hypertrace.core.documentstore.CloseableIterator; import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.JSONDocument; import org.hypertrace.core.documentstore.Key; @@ -427,5 +432,36 @@ void testBulkUpdateThrowsOnSQLException() throws Exception { assertEquals(sqlException, thrown.getCause()); } + + @Test + @DisplayName("Should close beforeIterator on exception when BEFORE_UPDATE") + @SuppressWarnings("unchecked") + void testBulkUpdateClosesIteratorOnException() throws Exception { + // Create spy to mock find() while testing rest of bulkUpdate + FlatPostgresCollection spyCollection = spy(flatPostgresCollection); + + CloseableIterator mockIterator = mock(CloseableIterator.class); + doReturn(mockIterator).when(spyCollection).find(any(Query.class)); + + Query query = Query.builder().build(); + List updates = List.of(SubDocumentUpdate.of("price", 100)); + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.BEFORE_UPDATE).build(); + + Map schema = createBasicSchema(); + when(mockSchemaRegistry.getColumnOrRefresh(anyString(), anyString())) + .thenAnswer( + invocation -> { + String columnName = invocation.getArgument(1); + return Optional.ofNullable(schema.get(columnName)); + }); + + RuntimeException runtimeException = new RuntimeException("Connection pool exhausted"); + when(mockClient.getPooledConnection()).thenThrow(runtimeException); + + assertThrows(IOException.class, () -> spyCollection.bulkUpdate(query, updates, options)); + + verify(mockIterator).close(); + } } }