Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2584,10 +2584,16 @@ void testUpdateUnsupportedOperator() {
assertThrows(
IllegalArgumentException.class, () -> flatCollection.update(query, updates, options));
}
}

@Nested
@DisplayName("Bulk Update Operations")
class BulkUpdateTests {

@Test
@DisplayName("Should throw UnsupportedOperationException for bulkUpdate")
void testBulkUpdate() {
@DisplayName("Should update multiple rows and return AFTER_UPDATE documents")
void testBulkUpdateWithAfterUpdateReturn() throws Exception {
// Filter: price > 5 should match multiple rows (IDs 1, 2, 3, 5, 6, 7, 8)
Query query =
Query.builder()
.setFilter(
Expand All @@ -2597,14 +2603,330 @@ void testBulkUpdate() {
ConstantExpression.of(5)))
.build();

List<SubDocumentUpdate> updates = List.of(SubDocumentUpdate.of("price", 100));
List<SubDocumentUpdate> updates = List.of(SubDocumentUpdate.of("quantity", 999));

UpdateOptions options =
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();

CloseableIterator<Document> resultIterator =
flatCollection.bulkUpdate(query, updates, options);

List<Document> results = new ArrayList<>();
while (resultIterator.hasNext()) {
results.add(resultIterator.next());
}
resultIterator.close();

assertTrue(results.size() > 1, "Should return multiple updated documents");

for (Document doc : results) {
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());
assertEquals(999, json.get("quantity").asInt(), "All docs should have updated quantity");
}

PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT COUNT(*) FROM \"%s\" WHERE \"quantity\" = 999",
FLAT_COLLECTION_NAME));
ResultSet rs = ps.executeQuery()) {
assertTrue(rs.next());
assertEquals(results.size(), rs.getInt(1), "DB should have same number of updated rows");
}
}

@Test
@DisplayName("Should update multiple rows and return BEFORE_UPDATE documents")
void testBulkUpdateWithBeforeUpdateReturn() throws Exception {
// First, get the original quantities for verification
Map<String, Integer> originalQuantities = new HashMap<>();
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT \"id\", \"quantity\" FROM \"%s\" WHERE \"price\" > 10",
FLAT_COLLECTION_NAME));
ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
originalQuantities.put(rs.getString("id"), rs.getInt("quantity"));
}
}

// Filter: price > 10 should match a subset of rows
Query query =
Query.builder()
.setFilter(
RelationalExpression.of(
IdentifierExpression.of("price"),
RelationalOperator.GT,
ConstantExpression.of(10)))
.build();

List<SubDocumentUpdate> updates = List.of(SubDocumentUpdate.of("quantity", 888));

UpdateOptions options =
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.BEFORE_UPDATE).build();

CloseableIterator<Document> resultIterator =
flatCollection.bulkUpdate(query, updates, options);

List<Document> results = new ArrayList<>();
while (resultIterator.hasNext()) {
results.add(resultIterator.next());
}
resultIterator.close();

// Verify the returned documents have the ORIGINAL quantities (before update)
for (Document doc : results) {
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());
String id = json.get("id").asText();
int returnedQuantity = json.get("quantity").asInt();

assertTrue(originalQuantities.containsKey(id), "Returned doc ID should be in original set");
assertEquals(
originalQuantities.get(id).intValue(),
returnedQuantity,
"Returned quantity should be the ORIGINAL value");
}

// But database should have the NEW value
try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT \"quantity\" FROM \"%s\" WHERE \"price\" > 10",
FLAT_COLLECTION_NAME));
ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
assertEquals(888, rs.getInt("quantity"), "DB should have the updated value");
}
}
}

@Test
@DisplayName("Should return empty iterator when ReturnDocumentType is NONE")
void testBulkUpdateWithNoneReturn() throws Exception {
Query query =
Query.builder()
.setFilter(
RelationalExpression.of(
IdentifierExpression.of("id"),
RelationalOperator.EQ,
ConstantExpression.of("1")))
.build();

List<SubDocumentUpdate> updates = List.of(SubDocumentUpdate.of("price", 123));

UpdateOptions options =
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.NONE).build();

CloseableIterator<Document> resultIterator =
flatCollection.bulkUpdate(query, updates, options);

// Should return empty iterator
assertFalse(resultIterator.hasNext(), "Should return empty iterator for NONE return type");
resultIterator.close();

// But database should still be updated
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT \"price\" FROM \"%s\" WHERE \"id\" = '1'", FLAT_COLLECTION_NAME));
ResultSet rs = ps.executeQuery()) {
assertTrue(rs.next());
assertEquals(123, rs.getInt("price"));
}
}

@Test
@DisplayName("Should return empty iterator when filter matches no documents")
void testBulkUpdateNoMatchingDocuments() throws Exception {
Query query =
Query.builder()
.setFilter(
RelationalExpression.of(
IdentifierExpression.of("id"),
RelationalOperator.EQ,
ConstantExpression.of("non-existent-id")))
.build();

List<SubDocumentUpdate> updates = List.of(SubDocumentUpdate.of("price", 999));

UpdateOptions options =
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();

CloseableIterator<Document> resultIterator =
flatCollection.bulkUpdate(query, updates, options);

assertFalse(resultIterator.hasNext(), "Should return empty iterator when no docs match");
resultIterator.close();
}

@Test
@DisplayName("Should update with multiple SubDocumentUpdates")
void testBulkUpdateMultipleFields() throws Exception {
// Update item = "Soap" (IDs 1, 5, 8)
Query query =
Query.builder()
.setFilter(
RelationalExpression.of(
IdentifierExpression.of("item"),
RelationalOperator.EQ,
ConstantExpression.of("Soap")))
.build();

// Update both price and quantity
List<SubDocumentUpdate> updates =
List.of(SubDocumentUpdate.of("price", 50), SubDocumentUpdate.of("quantity", 200));

UpdateOptions options =
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();

CloseableIterator<Document> resultIterator =
flatCollection.bulkUpdate(query, updates, options);

List<Document> results = new ArrayList<>();
while (resultIterator.hasNext()) {
results.add(resultIterator.next());
}
resultIterator.close();

assertEquals(3, results.size(), "Should return 3 Soap items");

for (Document doc : results) {
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());
assertEquals("Soap", json.get("item").asText());
assertEquals(50, json.get("price").asInt());
assertEquals(200, json.get("quantity").asInt());
}
}

@Test
@DisplayName("Should update nested JSONB paths for multiple documents")
void testBulkUpdateNestedJsonbPath() throws Exception {
// Documents with props JSONB: IDs 1, 3, 5, 7
Query query =
Query.builder()
.setFilter(
RelationalExpression.of(
IdentifierExpression.of("item"),
RelationalOperator.EQ,
ConstantExpression.of("Soap")))
.build();

List<SubDocumentUpdate> updates =
List.of(SubDocumentUpdate.of("props.brand", "BulkUpdatedBrand"));

UpdateOptions options =
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();

CloseableIterator<Document> resultIterator =
flatCollection.bulkUpdate(query, updates, options);

List<Document> results = new ArrayList<>();
while (resultIterator.hasNext()) {
results.add(resultIterator.next());
}
resultIterator.close();

// Verify all returned documents have updated props.brand
for (Document doc : results) {
JsonNode json = OBJECT_MAPPER.readTree(doc.toJson());
JsonNode props = json.get("props");
if (props != null && !props.isNull()) {
assertEquals(
"BulkUpdatedBrand", props.get("brand").asText(), "props.brand should be updated");
}
}
}

@Test
@DisplayName("Should throw IllegalArgumentException for empty updates")
void testBulkUpdateEmptyUpdates() {
Query query =
Query.builder()
.setFilter(
RelationalExpression.of(
IdentifierExpression.of("id"),
RelationalOperator.EQ,
ConstantExpression.of("1")))
.build();

List<SubDocumentUpdate> emptyUpdates = Collections.emptyList();

UpdateOptions options =
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();

assertThrows(
UnsupportedOperationException.class,
() -> flatCollection.bulkUpdate(query, updates, options));
IllegalArgumentException.class,
() -> flatCollection.bulkUpdate(query, emptyUpdates, options));
}

@Test
@DisplayName("Should skip non-existent column with default SKIP strategy")
void testBulkUpdateNonExistentColumnWithSkipStrategy() throws Exception {
Query query =
Query.builder()
.setFilter(
RelationalExpression.of(
IdentifierExpression.of("id"),
RelationalOperator.EQ,
ConstantExpression.of("1")))
.build();

// Mix of valid and invalid (non-existent) column paths
List<SubDocumentUpdate> updates =
List.of(
SubDocumentUpdate.of("price", 111),
SubDocumentUpdate.of("nonExistentColumn", "someValue"));

UpdateOptions options =
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();

// Default strategy is SKIP - should not throw, just skip the non-existent column
CloseableIterator<Document> resultIterator =
flatCollection.bulkUpdate(query, updates, options);

List<Document> results = new ArrayList<>();
while (resultIterator.hasNext()) {
results.add(resultIterator.next());
}
resultIterator.close();

assertEquals(1, results.size());
JsonNode json = OBJECT_MAPPER.readTree(results.get(0).toJson());
assertEquals(111, json.get("price").asInt(), "Valid column should be updated");
assertFalse(json.has("nonExistentColumn"), "Non-existent column should not appear");
}

@Test
@DisplayName("Should throw exception for non-existent column with THROW strategy")
void testBulkUpdateNonExistentColumnWithThrowStrategy() {
Collection collectionWithThrowStrategy =
getFlatCollectionWithStrategy(MissingColumnStrategy.THROW.toString());

Query query =
Query.builder()
.setFilter(
RelationalExpression.of(
IdentifierExpression.of("id"),
RelationalOperator.EQ,
ConstantExpression.of("1")))
.build();

List<SubDocumentUpdate> updates =
List.of(SubDocumentUpdate.of("nonExistentColumn", "someValue"));

UpdateOptions options =
UpdateOptions.builder().returnDocumentType(ReturnDocumentType.AFTER_UPDATE).build();

assertThrows(
IOException.class, () -> collectionWithThrowStrategy.bulkUpdate(query, updates, options));
}
}

Expand Down
Loading
Loading