Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,245 @@ void testBulkUpsertAndReturnOlderDocumentsUpsertFailure() throws Exception {
}
}
}

@Test
@DisplayName("Should bulk createOrReplace multiple new documents")
void testBulkCreateOrReplaceNewDocuments() throws Exception {
Map<Key, Document> bulkMap = new LinkedHashMap<>();

ObjectNode node1 = OBJECT_MAPPER.createObjectNode();
node1.put("item", "BulkReplaceItem1");
node1.put("price", 201);
node1.put("quantity", 10);
bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-replace-1"), new JSONDocument(node1));

ObjectNode node2 = OBJECT_MAPPER.createObjectNode();
node2.put("item", "BulkReplaceItem2");
node2.put("price", 202);
node2.put("quantity", 20);
bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-replace-2"), new JSONDocument(node2));

boolean result = flatCollection.bulkCreateOrReplace(bulkMap);

assertTrue(result);

queryAndAssert(
new SingleValueKey(DEFAULT_TENANT, "bulk-replace-1"),
rs -> {
assertTrue(rs.next());
assertEquals("BulkReplaceItem1", rs.getString("item"));
assertEquals(201, rs.getInt("price"));
assertEquals(10, rs.getInt("quantity"));
});

queryAndAssert(
new SingleValueKey(DEFAULT_TENANT, "bulk-replace-2"),
rs -> {
assertTrue(rs.next());
assertEquals("BulkReplaceItem2", rs.getString("item"));
assertEquals(202, rs.getInt("price"));
assertEquals(20, rs.getInt("quantity"));
});
}

@Test
@DisplayName(
"Should bulk createOrReplace replacing existing documents and reset missing cols to default")
void testBulkCreateOrReplaceResetsUnspecifiedColumnsToDefault() throws Exception {
// First create documents with multiple fields
String docId1 = "bulk-replace-reset-1";
String docId2 = "bulk-replace-reset-2";

ObjectNode initial1 = OBJECT_MAPPER.createObjectNode();
initial1.put("item", "Original1");
initial1.put("price", 100);
initial1.put("quantity", 50);
initial1.put("in_stock", true);
flatCollection.createOrReplace(
new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial1));

ObjectNode initial2 = OBJECT_MAPPER.createObjectNode();
initial2.put("item", "Original2");
initial2.put("price", 200);
initial2.put("quantity", 75);
initial2.put("in_stock", false);
flatCollection.createOrReplace(
new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial2));

// Now bulk createOrReplace with only some fields - others should be RESET to default
Map<Key, Document> bulkMap = new LinkedHashMap<>();

ObjectNode updated1 = OBJECT_MAPPER.createObjectNode();
updated1.put("item", "Updated1");
// price, quantity, in_stock are NOT specified - should be reset to NULL
bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(updated1));

ObjectNode updated2 = OBJECT_MAPPER.createObjectNode();
updated2.put("item", "Updated2");
updated2.put("price", 999);
// quantity, in_stock are NOT specified - should be reset to NULL
bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(updated2));

boolean result = flatCollection.bulkCreateOrReplace(bulkMap);

assertTrue(result);

// Verify doc1: item updated, other fields reset to NULL
queryAndAssert(
new SingleValueKey(DEFAULT_TENANT, docId1),
rs -> {
assertTrue(rs.next());
assertEquals("Updated1", rs.getString("item"));
assertNull(rs.getObject("price")); // RESET to NULL
assertNull(rs.getObject("quantity")); // RESET to NULL
assertNull(rs.getObject("in_stock")); // RESET to NULL
});

// Verify doc2: item and price updated, other fields reset to NULL
queryAndAssert(
new SingleValueKey(DEFAULT_TENANT, docId2),
rs -> {
assertTrue(rs.next());
assertEquals("Updated2", rs.getString("item"));
assertEquals(999, rs.getInt("price"));
assertNull(rs.getObject("quantity")); // RESET to NULL
assertNull(rs.getObject("in_stock")); // RESET to NULL
});
}

@Test
@DisplayName("bulkUpsert vs bulkCreateOrReplace: upsert preserves, createOrReplace resets")
void testBulkUpsertVsBulkCreateOrReplaceBehavior() throws Exception {
// Setup: Create two identical documents
String docId1 = "bulk-compare-upsert";
String docId2 = "bulk-compare-replace";

ObjectNode initial = OBJECT_MAPPER.createObjectNode();
initial.put("item", "Original Item");
initial.put("price", 100);
initial.put("quantity", 50);

flatCollection.createOrReplace(
new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial.deepCopy()));
flatCollection.createOrReplace(
new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial.deepCopy()));

// Now update both with partial documents (only item field)
ObjectNode partialUpdate = OBJECT_MAPPER.createObjectNode();
partialUpdate.put("item", "Updated Item");

// Use bulkUpsert for doc1 - should PRESERVE price and quantity
Map<Key, Document> upsertMap = new LinkedHashMap<>();
upsertMap.put(
new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(partialUpdate.deepCopy()));
flatCollection.bulkUpsert(upsertMap);

// Use bulkCreateOrReplace for doc2 - should RESET price and quantity to NULL
Map<Key, Document> replaceMap = new LinkedHashMap<>();
replaceMap.put(
new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(partialUpdate.deepCopy()));
flatCollection.bulkCreateOrReplace(replaceMap);

// Verify bulkUpsert preserved original values
queryAndAssert(
new SingleValueKey(DEFAULT_TENANT, docId1),
rs -> {
assertTrue(rs.next());
assertEquals("Updated Item", rs.getString("item"));
assertEquals(100, rs.getInt("price")); // PRESERVED
assertEquals(50, rs.getInt("quantity")); // PRESERVED
});

// Verify bulkCreateOrReplace reset to defaults
queryAndAssert(
new SingleValueKey(DEFAULT_TENANT, docId2),
rs -> {
assertTrue(rs.next());
assertEquals("Updated Item", rs.getString("item"));
assertNull(rs.getObject("price")); // RESET to NULL
assertNull(rs.getObject("quantity")); // RESET to NULL
});
}

@Test
@DisplayName("Should handle empty document map for bulkCreateOrReplace")
void testBulkCreateOrReplaceEmptyMap() {
Map<Key, Document> emptyMap = Collections.emptyMap();
boolean result = flatCollection.bulkCreateOrReplace(emptyMap);
assertTrue(result);
}

@Test
@DisplayName("Should handle null document map for bulkCreateOrReplace")
void testBulkCreateOrReplaceNullMap() {
boolean result = flatCollection.bulkCreateOrReplace(null);
assertTrue(result);
}

@Test
@DisplayName("Should skip unknown fields in bulkCreateOrReplace")
void testBulkCreateOrReplaceSkipsUnknownFields() throws Exception {
Map<Key, Document> bulkMap = new LinkedHashMap<>();

ObjectNode node = OBJECT_MAPPER.createObjectNode();
node.put("item", "ItemWithUnknown");
node.put("price", 300);
node.put("unknown_field", "should be skipped");
bulkMap.put(
new SingleValueKey(DEFAULT_TENANT, "bulk-replace-unknown"), new JSONDocument(node));

boolean result = flatCollection.bulkCreateOrReplace(bulkMap);

assertTrue(result);

queryAndAssert(
new SingleValueKey(DEFAULT_TENANT, "bulk-replace-unknown"),
rs -> {
assertTrue(rs.next());
assertEquals("ItemWithUnknown", rs.getString("item"));
assertEquals(300, rs.getInt("price"));
});
}

@Test
@DisplayName(
"Should ignore documents with unknown fields when IGNORE_DOCUMENT strategy for bulkCreateOrReplace")
void testBulkCreateOrReplaceIgnoreDocumentStrategy() throws Exception {
Collection collectionWithIgnoreStrategy =
getFlatCollectionWithStrategy(MissingColumnStrategy.IGNORE_DOCUMENT.toString());

Map<Key, Document> bulkMap = new LinkedHashMap<>();

// Document with unknown field - should be ignored
ObjectNode nodeWithUnknown = OBJECT_MAPPER.createObjectNode();
nodeWithUnknown.put("item", "ItemWithUnknown");
nodeWithUnknown.put("unknown_field", "should cause document to be ignored");
bulkMap.put(
new SingleValueKey(DEFAULT_TENANT, "ignore-replace-1"),
new JSONDocument(nodeWithUnknown));

// Valid document - should be inserted
ObjectNode validNode = OBJECT_MAPPER.createObjectNode();
validNode.put("item", "ValidItem");
validNode.put("price", 200);
bulkMap.put(
new SingleValueKey(DEFAULT_TENANT, "ignore-replace-2"), new JSONDocument(validNode));

boolean result = collectionWithIgnoreStrategy.bulkCreateOrReplace(bulkMap);

assertTrue(result);

queryAndAssert(
new SingleValueKey(DEFAULT_TENANT, "ignore-replace-1"), rs -> assertFalse(rs.next()));

queryAndAssert(
new SingleValueKey(DEFAULT_TENANT, "ignore-replace-2"),
rs -> {
assertTrue(rs.next());
assertEquals("ValidItem", rs.getString("item"));
assertEquals(200, rs.getInt("price"));
});
}
}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,16 @@

/** Interface spec for common operations on a collection of documents */
public interface Collection {

/**
* Upsert (create a new doc or update if one already exists) the given document into the doc
* store.
*
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
* the existing fields are modified is implementation specific. For example, upserting <code>
* {
* "foo2": "bar2"
* }
* { "foo2": "bar2" }
* </code> if a document <code>
* {
* "foo1": "bar1"
* }
* { "foo1": "bar1" }
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
* the "foo1" field is implementation specific
*
Expand All @@ -46,13 +43,9 @@ public interface Collection {
*
* <p>Note: This method ensures that all the fields defined in the `Document` are set/created. How
* the existing fields are modified is implementation specific. For example, upserting <code>
* {
* "foo2": "bar2"
* }
* { "foo2": "bar2" }
* </code> if a document <code>
* {
* "foo1": "bar1"
* }
* { "foo1": "bar1" }
* </code> already exists would ensure that "foo2" is set the value of "bar2" and what happens to
* the "foo1" field is implementation specific
*
Expand Down Expand Up @@ -104,10 +97,10 @@ public interface Collection {
/**
* Search for documents matching the query.
*
* @deprecated Use {@link #query(org.hypertrace.core.documentstore.query.Query, QueryOptions)}
* instead
* @param query filter to query matching documents
* @return {@link CloseableIterator} of matching documents
* @deprecated Use {@link #query(org.hypertrace.core.documentstore.query.Query, QueryOptions)}
* instead
*/
@Deprecated(forRemoval = true)
CloseableIterator<Document> search(Query query);
Expand Down Expand Up @@ -268,6 +261,30 @@ CloseableIterator<Document> bulkUpsertAndReturnOlderDocuments(Map<Key, Document>
*/
boolean createOrReplace(final Key key, final Document document) throws IOException;

/**
* Bulk createOrReplace with no atomicity guarantee. It partial documents succeed, the operation
* is not rolled back. It's possible that certain document are ignored, if they contain columns
* that are not present in the table's schema. This happens when the missingColumnStrategy is
* configured to {@link
* org.hypertrace.core.documentstore.model.options.MissingColumnStrategy#IGNORE_DOCUMENT}. If it's
* configured to {@link
* org.hypertrace.core.documentstore.model.options.MissingColumnStrategy#SKIP}, then that column
* is skipped (but the document is still created/replaced). If it's configured to be {@link
* org.hypertrace.core.documentstore.model.options.MissingColumnStrategy#THROW}, the entire batch
* fails.
*
* <p>Semantically, if the document already exists, each column is replaced with its new value (or
* to its default value if not specified). Note that no merge happens. For example, if the
* original row contains "tag" : {"k1": "v1"} and the new row contains "tag" : {"k2": "v2"}, then
* the final row will be "tag" : {"k2": "v2"}
*
* @param documents the batch
* @return true if the operation succeeded, even partially.
*/
default boolean bulkCreateOrReplace(Map<Key, Document> documents) {
throw new UnsupportedOperationException("bulkCreateOrReplace is not supported");
}

/**
* Atomically create a new document if the key does not exist in the collection or, replace the
* existing document if the key exists in the collection and return the created/replaced document
Expand Down
Loading
Loading