diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index bbfa508e3..138229ee3 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -2584,10 +2584,16 @@ void testUpdateUnsupportedOperator() { assertThrows( IllegalArgumentException.class, () -> flatCollection.update(query, updates, options)); } + } + + @Nested + @DisplayName("Bulk Update Operations") + class BulkUpdateTests { @Test - @DisplayName("Should throw UnsupportedOperationException for bulkUpdate") - void testBulkUpdate() { + @DisplayName("Should update multiple rows and return AFTER_UPDATE documents") + void testBulkUpdateWithAfterUpdateReturn() throws Exception { + // Filter: price > 5 should match multiple rows (IDs 1, 2, 3, 5, 6, 7, 8) Query query = Query.builder() .setFilter( @@ -2597,14 +2603,330 @@ void testBulkUpdate() { ConstantExpression.of(5))) .build(); - List updates = List.of(SubDocumentUpdate.of("price", 100)); + List updates = List.of(SubDocumentUpdate.of("quantity", 999)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + List results = new ArrayList<>(); + while (resultIterator.hasNext()) { + results.add(resultIterator.next()); + } + resultIterator.close(); + + assertTrue(results.size() > 1, "Should return multiple updated documents"); + + for (Document doc : results) { + JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); + assertEquals(999, json.get("quantity").asInt(), "All docs should have updated quantity"); + } + + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT COUNT(*) FROM \"%s\" WHERE \"quantity\" = 999", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(results.size(), rs.getInt(1), "DB should have same number of updated rows"); + } + } + + @Test + @DisplayName("Should update multiple rows and return BEFORE_UPDATE documents") + void testBulkUpdateWithBeforeUpdateReturn() throws Exception { + // First, get the original quantities for verification + Map originalQuantities = new HashMap<>(); + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"id\", \"quantity\" FROM \"%s\" WHERE \"price\" > 10", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + originalQuantities.put(rs.getString("id"), rs.getInt("quantity")); + } + } + + // Filter: price > 10 should match a subset of rows + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("price"), + RelationalOperator.GT, + ConstantExpression.of(10))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("quantity", 888)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.BEFORE_UPDATE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + List results = new ArrayList<>(); + while (resultIterator.hasNext()) { + results.add(resultIterator.next()); + } + resultIterator.close(); + + // Verify the returned documents have the ORIGINAL quantities (before update) + for (Document doc : results) { + JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); + String id = json.get("id").asText(); + int returnedQuantity = json.get("quantity").asInt(); + + assertTrue(originalQuantities.containsKey(id), "Returned doc ID should be in original set"); + assertEquals( + originalQuantities.get(id).intValue(), + returnedQuantity, + "Returned quantity should be the ORIGINAL value"); + } + + // But database should have the NEW value + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"quantity\" FROM \"%s\" WHERE \"price\" > 10", + FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + assertEquals(888, rs.getInt("quantity"), "DB should have the updated value"); + } + } + } + + @Test + @DisplayName("Should return empty iterator when ReturnDocumentType is NONE") + void testBulkUpdateWithNoneReturn() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 123)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.NONE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + // Should return empty iterator + assertFalse(resultIterator.hasNext(), "Should return empty iterator for NONE return type"); + resultIterator.close(); + + // But database should still be updated + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"price\" FROM \"%s\" WHERE \"id\" = '1'", FLAT_COLLECTION_NAME)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + assertEquals(123, rs.getInt("price")); + } + } + + @Test + @DisplayName("Should return empty iterator when filter matches no documents") + void testBulkUpdateNoMatchingDocuments() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("non-existent-id"))) + .build(); + + List updates = List.of(SubDocumentUpdate.of("price", 999)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + assertFalse(resultIterator.hasNext(), "Should return empty iterator when no docs match"); + resultIterator.close(); + } + + @Test + @DisplayName("Should update with multiple SubDocumentUpdates") + void testBulkUpdateMultipleFields() throws Exception { + // Update item = "Soap" (IDs 1, 5, 8) + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("item"), + RelationalOperator.EQ, + ConstantExpression.of("Soap"))) + .build(); + + // Update both price and quantity + List updates = + List.of(SubDocumentUpdate.of("price", 50), SubDocumentUpdate.of("quantity", 200)); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + List results = new ArrayList<>(); + while (resultIterator.hasNext()) { + results.add(resultIterator.next()); + } + resultIterator.close(); + + assertEquals(3, results.size(), "Should return 3 Soap items"); + + for (Document doc : results) { + JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); + assertEquals("Soap", json.get("item").asText()); + assertEquals(50, json.get("price").asInt()); + assertEquals(200, json.get("quantity").asInt()); + } + } + + @Test + @DisplayName("Should update nested JSONB paths for multiple documents") + void testBulkUpdateNestedJsonbPath() throws Exception { + // Documents with props JSONB: IDs 1, 3, 5, 7 + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("item"), + RelationalOperator.EQ, + ConstantExpression.of("Soap"))) + .build(); + + List updates = + List.of(SubDocumentUpdate.of("props.brand", "BulkUpdatedBrand")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + List results = new ArrayList<>(); + while (resultIterator.hasNext()) { + results.add(resultIterator.next()); + } + resultIterator.close(); + + // Verify all returned documents have updated props.brand + for (Document doc : results) { + JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); + JsonNode props = json.get("props"); + if (props != null && !props.isNull()) { + assertEquals( + "BulkUpdatedBrand", props.get("brand").asText(), "props.brand should be updated"); + } + } + } + + @Test + @DisplayName("Should throw IllegalArgumentException for empty updates") + void testBulkUpdateEmptyUpdates() { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + List emptyUpdates = Collections.emptyList(); UpdateOptions options = UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); assertThrows( - UnsupportedOperationException.class, - () -> flatCollection.bulkUpdate(query, updates, options)); + IllegalArgumentException.class, + () -> flatCollection.bulkUpdate(query, emptyUpdates, options)); + } + + @Test + @DisplayName("Should skip non-existent column with default SKIP strategy") + void testBulkUpdateNonExistentColumnWithSkipStrategy() throws Exception { + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + // Mix of valid and invalid (non-existent) column paths + List updates = + List.of( + SubDocumentUpdate.of("price", 111), + SubDocumentUpdate.of("nonExistentColumn", "someValue")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + // Default strategy is SKIP - should not throw, just skip the non-existent column + CloseableIterator resultIterator = + flatCollection.bulkUpdate(query, updates, options); + + List results = new ArrayList<>(); + while (resultIterator.hasNext()) { + results.add(resultIterator.next()); + } + resultIterator.close(); + + assertEquals(1, results.size()); + JsonNode json = OBJECT_MAPPER.readTree(results.get(0).toJson()); + assertEquals(111, json.get("price").asInt(), "Valid column should be updated"); + assertFalse(json.has("nonExistentColumn"), "Non-existent column should not appear"); + } + + @Test + @DisplayName("Should throw exception for non-existent column with THROW strategy") + void testBulkUpdateNonExistentColumnWithThrowStrategy() { + Collection collectionWithThrowStrategy = + getFlatCollectionWithStrategy(MissingColumnStrategy.THROW.toString()); + + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("id"), + RelationalOperator.EQ, + ConstantExpression.of("1"))) + .build(); + + List updates = + List.of(SubDocumentUpdate.of("nonExistentColumn", "someValue")); + + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows( + IOException.class, () -> collectionWithThrowStrategy.bulkUpdate(query, updates, options)); } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 007776f9e..cf4835a57 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -551,6 +551,65 @@ public Optional update( } } + @Override + public CloseableIterator bulkUpdate( + org.hypertrace.core.documentstore.query.Query query, + Collection updates, + UpdateOptions updateOptions) + throws IOException { + + Preconditions.checkArgument( + updateOptions != null && !updates.isEmpty(), "Updates cannot be NULL or empty"); + + String tableName = tableIdentifier.getTableName(); + CloseableIterator beforeIterator = null; + + try { + ReturnDocumentType returnType = updateOptions.getReturnDocumentType(); + + Map resolvedColumns = resolvePathsToColumns(updates, tableName); + + if (returnType == BEFORE_UPDATE) { + beforeIterator = find(query); + } + + try (Connection connection = client.getPooledConnection()) { + executeUpdate(connection, query, updates, tableName, resolvedColumns); + } + + switch (returnType) { + case AFTER_UPDATE: + return find(query); + + case BEFORE_UPDATE: + return beforeIterator; + + case NONE: + return CloseableIterator.emptyIterator(); + + default: + throw new UnsupportedOperationException( + "Unsupported return document type: " + returnType); + } + + } catch (SQLException e) { + if (beforeIterator != null) { + beforeIterator.close(); + } + LOGGER.error( + "SQLException during bulkUpdate operation. SQLState: {}, ErrorCode: {}", + e.getSQLState(), + e.getErrorCode(), + e); + throw new IOException(e); + } catch (Exception e) { + if (beforeIterator != null) { + beforeIterator.close(); + } + throw new IOException(e); + } + } + /** * Validates all updates and resolves column names. * @@ -733,15 +792,6 @@ private void executeUpdate( } } - @Override - public CloseableIterator bulkUpdate( - org.hypertrace.core.documentstore.query.Query query, - java.util.Collection updates, - UpdateOptions updateOptions) - throws IOException { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); - } - /*isRetry: Whether this is a retry attempt*/ private CreateResult createWithRetry(Key key, Document document, boolean isRetry) throws IOException { diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java index 6ff30b64e..fcd941124 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java @@ -4,9 +4,13 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -18,14 +22,20 @@ import java.sql.SQLException; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import org.hypertrace.core.documentstore.CloseableIterator; import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.JSONDocument; import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.model.options.ReturnDocumentType; +import org.hypertrace.core.documentstore.model.options.UpdateOptions; +import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata; import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; +import org.hypertrace.core.documentstore.query.Query; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -349,4 +359,109 @@ void testUpsertThrowsWhenRetryFails() throws Exception { verify(mockPreparedStatement, times(2)).executeQuery(); } } + + @Nested + @DisplayName("bulkUpdate Tests") + class BulkUpdateTests { + + @Test + @DisplayName("Should throw IllegalArgumentException for null options") + void testBulkUpdateThrowsOnNullOptions() { + Query query = Query.builder().build(); + List updates = List.of(SubDocumentUpdate.of("price", 100)); + + assertThrows( + IllegalArgumentException.class, + () -> flatPostgresCollection.bulkUpdate(query, updates, null)); + } + + @Test + @DisplayName("Should throw IllegalArgumentException for empty updates") + void testBulkUpdateThrowsOnEmptyUpdates() { + Query query = Query.builder().build(); + List emptyUpdates = Collections.emptyList(); + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + assertThrows( + IllegalArgumentException.class, + () -> flatPostgresCollection.bulkUpdate(query, emptyUpdates, options)); + } + + @Test + @DisplayName("Should throw IOException for unsupported operator") + void testBulkUpdateThrowsOnUnsupportedOperator() { + Query query = Query.builder().build(); + // UNSET is not supported + List updates = + List.of( + SubDocumentUpdate.builder() + .subDocument("price") + .operator(org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.UNSET) + .build()); + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + // No stubbing needed - operator check happens before schema lookup + assertThrows( + IOException.class, () -> flatPostgresCollection.bulkUpdate(query, updates, options)); + } + + @Test + @DisplayName("Should throw IOException on SQLException and log SQLState and ErrorCode") + void testBulkUpdateThrowsOnSQLException() throws Exception { + Query query = Query.builder().build(); + List updates = List.of(SubDocumentUpdate.of("price", 100)); + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build(); + + Map schema = createBasicSchema(); + when(mockSchemaRegistry.getColumnOrRefresh(anyString(), anyString())) + .thenAnswer( + invocation -> { + String columnName = invocation.getArgument(1); + return Optional.ofNullable(schema.get(columnName)); + }); + + SQLException sqlException = new SQLException("Connection failed", "08001", 1001); + when(mockClient.getPooledConnection()).thenThrow(sqlException); + + IOException thrown = + assertThrows( + IOException.class, () -> flatPostgresCollection.bulkUpdate(query, updates, options)); + + assertEquals(sqlException, thrown.getCause()); + } + + @Test + @DisplayName("Should close beforeIterator on exception when BEFORE_UPDATE") + @SuppressWarnings("unchecked") + void testBulkUpdateClosesIteratorOnException() throws Exception { + // Create spy to mock find() while testing rest of bulkUpdate + FlatPostgresCollection spyCollection = spy(flatPostgresCollection); + + CloseableIterator mockIterator = mock(CloseableIterator.class); + doReturn(mockIterator).when(spyCollection).find(any(Query.class)); + + Query query = Query.builder().build(); + List updates = List.of(SubDocumentUpdate.of("price", 100)); + UpdateOptions options = + UpdateOptions.builder().returnDocumentType(ReturnDocumentType.BEFORE_UPDATE).build(); + + Map schema = createBasicSchema(); + when(mockSchemaRegistry.getColumnOrRefresh(anyString(), anyString())) + .thenAnswer( + invocation -> { + String columnName = invocation.getArgument(1); + return Optional.ofNullable(schema.get(columnName)); + }); + + RuntimeException runtimeException = new RuntimeException("Connection pool exhausted"); + when(mockClient.getPooledConnection()).thenThrow(runtimeException); + + assertThrows(IOException.class, () -> spyCollection.bulkUpdate(query, updates, options)); + + verify(mockIterator).close(); + } + } }