diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index f6c93cf4..9ca1e31e 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -1465,6 +1465,245 @@ void testBulkUpsertAndReturnOlderDocumentsUpsertFailure() throws Exception { } } } + + @Test + @DisplayName("Should bulk createOrReplace multiple new documents") + void testBulkCreateOrReplaceNewDocuments() throws Exception { + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode node1 = OBJECT_MAPPER.createObjectNode(); + node1.put("item", "BulkReplaceItem1"); + node1.put("price", 201); + node1.put("quantity", 10); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-replace-1"), new JSONDocument(node1)); + + ObjectNode node2 = OBJECT_MAPPER.createObjectNode(); + node2.put("item", "BulkReplaceItem2"); + node2.put("price", 202); + node2.put("quantity", 20); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-replace-2"), new JSONDocument(node2)); + + boolean result = flatCollection.bulkCreateOrReplace(bulkMap); + + assertTrue(result); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-replace-1"), + rs -> { + assertTrue(rs.next()); + assertEquals("BulkReplaceItem1", rs.getString("item")); + assertEquals(201, rs.getInt("price")); + assertEquals(10, rs.getInt("quantity")); + }); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-replace-2"), + rs -> { + assertTrue(rs.next()); + assertEquals("BulkReplaceItem2", rs.getString("item")); + assertEquals(202, rs.getInt("price")); + assertEquals(20, rs.getInt("quantity")); + }); + } + + @Test + @DisplayName( + "Should bulk createOrReplace replacing existing documents and reset missing cols to default") + void testBulkCreateOrReplaceResetsUnspecifiedColumnsToDefault() throws Exception { + // First create documents with multiple fields + String docId1 = "bulk-replace-reset-1"; + String docId2 = "bulk-replace-reset-2"; + + ObjectNode initial1 = OBJECT_MAPPER.createObjectNode(); + initial1.put("item", "Original1"); + initial1.put("price", 100); + initial1.put("quantity", 50); + initial1.put("in_stock", true); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial1)); + + ObjectNode initial2 = OBJECT_MAPPER.createObjectNode(); + initial2.put("item", "Original2"); + initial2.put("price", 200); + initial2.put("quantity", 75); + initial2.put("in_stock", false); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial2)); + + // Now bulk createOrReplace with only some fields - others should be RESET to default + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode updated1 = OBJECT_MAPPER.createObjectNode(); + updated1.put("item", "Updated1"); + // price, quantity, in_stock are NOT specified - should be reset to NULL + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(updated1)); + + ObjectNode updated2 = OBJECT_MAPPER.createObjectNode(); + updated2.put("item", "Updated2"); + updated2.put("price", 999); + // quantity, in_stock are NOT specified - should be reset to NULL + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(updated2)); + + boolean result = flatCollection.bulkCreateOrReplace(bulkMap); + + assertTrue(result); + + // Verify doc1: item updated, other fields reset to NULL + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId1), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated1", rs.getString("item")); + assertNull(rs.getObject("price")); // RESET to NULL + assertNull(rs.getObject("quantity")); // RESET to NULL + assertNull(rs.getObject("in_stock")); // RESET to NULL + }); + + // Verify doc2: item and price updated, other fields reset to NULL + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId2), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated2", rs.getString("item")); + assertEquals(999, rs.getInt("price")); + assertNull(rs.getObject("quantity")); // RESET to NULL + assertNull(rs.getObject("in_stock")); // RESET to NULL + }); + } + + @Test + @DisplayName("bulkUpsert vs bulkCreateOrReplace: upsert preserves, createOrReplace resets") + void testBulkUpsertVsBulkCreateOrReplaceBehavior() throws Exception { + // Setup: Create two identical documents + String docId1 = "bulk-compare-upsert"; + String docId2 = "bulk-compare-replace"; + + ObjectNode initial = OBJECT_MAPPER.createObjectNode(); + initial.put("item", "Original Item"); + initial.put("price", 100); + initial.put("quantity", 50); + + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial.deepCopy())); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial.deepCopy())); + + // Now update both with partial documents (only item field) + ObjectNode partialUpdate = OBJECT_MAPPER.createObjectNode(); + partialUpdate.put("item", "Updated Item"); + + // Use bulkUpsert for doc1 - should PRESERVE price and quantity + Map upsertMap = new LinkedHashMap<>(); + upsertMap.put( + new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(partialUpdate.deepCopy())); + flatCollection.bulkUpsert(upsertMap); + + // Use bulkCreateOrReplace for doc2 - should RESET price and quantity to NULL + Map replaceMap = new LinkedHashMap<>(); + replaceMap.put( + new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(partialUpdate.deepCopy())); + flatCollection.bulkCreateOrReplace(replaceMap); + + // Verify bulkUpsert preserved original values + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId1), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated Item", rs.getString("item")); + assertEquals(100, rs.getInt("price")); // PRESERVED + assertEquals(50, rs.getInt("quantity")); // PRESERVED + }); + + // Verify bulkCreateOrReplace reset to defaults + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId2), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated Item", rs.getString("item")); + assertNull(rs.getObject("price")); // RESET to NULL + assertNull(rs.getObject("quantity")); // RESET to NULL + }); + } + + @Test + @DisplayName("Should handle empty document map for bulkCreateOrReplace") + void testBulkCreateOrReplaceEmptyMap() { + Map emptyMap = Collections.emptyMap(); + boolean result = flatCollection.bulkCreateOrReplace(emptyMap); + assertTrue(result); + } + + @Test + @DisplayName("Should handle null document map for bulkCreateOrReplace") + void testBulkCreateOrReplaceNullMap() { + boolean result = flatCollection.bulkCreateOrReplace(null); + assertTrue(result); + } + + @Test + @DisplayName("Should skip unknown fields in bulkCreateOrReplace") + void testBulkCreateOrReplaceSkipsUnknownFields() throws Exception { + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode node = OBJECT_MAPPER.createObjectNode(); + node.put("item", "ItemWithUnknown"); + node.put("price", 300); + node.put("unknown_field", "should be skipped"); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, "bulk-replace-unknown"), new JSONDocument(node)); + + boolean result = flatCollection.bulkCreateOrReplace(bulkMap); + + assertTrue(result); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-replace-unknown"), + rs -> { + assertTrue(rs.next()); + assertEquals("ItemWithUnknown", rs.getString("item")); + assertEquals(300, rs.getInt("price")); + }); + } + + @Test + @DisplayName( + "Should ignore documents with unknown fields when IGNORE_DOCUMENT strategy for bulkCreateOrReplace") + void testBulkCreateOrReplaceIgnoreDocumentStrategy() throws Exception { + Collection collectionWithIgnoreStrategy = + getFlatCollectionWithStrategy(MissingColumnStrategy.IGNORE_DOCUMENT.toString()); + + Map bulkMap = new LinkedHashMap<>(); + + // Document with unknown field - should be ignored + ObjectNode nodeWithUnknown = OBJECT_MAPPER.createObjectNode(); + nodeWithUnknown.put("item", "ItemWithUnknown"); + nodeWithUnknown.put("unknown_field", "should cause document to be ignored"); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, "ignore-replace-1"), + new JSONDocument(nodeWithUnknown)); + + // Valid document - should be inserted + ObjectNode validNode = OBJECT_MAPPER.createObjectNode(); + validNode.put("item", "ValidItem"); + validNode.put("price", 200); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, "ignore-replace-2"), new JSONDocument(validNode)); + + boolean result = collectionWithIgnoreStrategy.bulkCreateOrReplace(bulkMap); + + assertTrue(result); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "ignore-replace-1"), rs -> assertFalse(rs.next())); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "ignore-replace-2"), + rs -> { + assertTrue(rs.next()); + assertEquals("ValidItem", rs.getString("item")); + assertEquals(200, rs.getInt("price")); + }); + } } @Nested diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java index 4d0430fb..66cd6b6c 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/Collection.java @@ -15,19 +15,16 @@ /** Interface spec for common operations on a collection of documents */ public interface Collection { + /** * Upsert (create a new doc or update if one already exists) the given document into the doc * store. * *

Note: This method ensures that all the fields defined in the `Document` are set/created. How * the existing fields are modified is implementation specific. For example, upserting - * { - * "foo2": "bar2" - * } + * { "foo2": "bar2" } * if a document - * { - * "foo1": "bar1" - * } + * { "foo1": "bar1" } * already exists would ensure that "foo2" is set the value of "bar2" and what happens to * the "foo1" field is implementation specific * @@ -46,13 +43,9 @@ public interface Collection { * *

Note: This method ensures that all the fields defined in the `Document` are set/created. How * the existing fields are modified is implementation specific. For example, upserting - * { - * "foo2": "bar2" - * } + * { "foo2": "bar2" } * if a document - * { - * "foo1": "bar1" - * } + * { "foo1": "bar1" } * already exists would ensure that "foo2" is set the value of "bar2" and what happens to * the "foo1" field is implementation specific * @@ -104,10 +97,10 @@ public interface Collection { /** * Search for documents matching the query. * - * @deprecated Use {@link #query(org.hypertrace.core.documentstore.query.Query, QueryOptions)} - * instead * @param query filter to query matching documents * @return {@link CloseableIterator} of matching documents + * @deprecated Use {@link #query(org.hypertrace.core.documentstore.query.Query, QueryOptions)} + * instead */ @Deprecated(forRemoval = true) CloseableIterator search(Query query); @@ -268,6 +261,30 @@ CloseableIterator bulkUpsertAndReturnOlderDocuments(Map */ boolean createOrReplace(final Key key, final Document document) throws IOException; + /** + * Bulk createOrReplace with no atomicity guarantee. It partial documents succeed, the operation + * is not rolled back. It's possible that certain document are ignored, if they contain columns + * that are not present in the table's schema. This happens when the missingColumnStrategy is + * configured to {@link + * org.hypertrace.core.documentstore.model.options.MissingColumnStrategy#IGNORE_DOCUMENT}. If it's + * configured to {@link + * org.hypertrace.core.documentstore.model.options.MissingColumnStrategy#SKIP}, then that column + * is skipped (but the document is still created/replaced). If it's configured to be {@link + * org.hypertrace.core.documentstore.model.options.MissingColumnStrategy#THROW}, the entire batch + * fails. + * + *

Semantically, if the document already exists, each column is replaced with its new value (or + * to its default value if not specified). Note that no merge happens. For example, if the + * original row contains "tag" : {"k1": "v1"} and the new row contains "tag" : {"k2": "v2"}, then + * the final row will be "tag" : {"k2": "v2"} + * + * @param documents the batch + * @return true if the operation succeeded, even partially. + */ + default boolean bulkCreateOrReplace(Map documents) { + throw new UnsupportedOperationException("bulkCreateOrReplace is not supported"); + } + /** * Atomically create a new document if the key does not exist in the collection or, replace the * existing document if the key exists in the collection and return the created/replaced document diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 6a04b694..aa64fa44 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -419,6 +419,109 @@ public boolean bulkUpsert(Map documents) { return false; } + @Override + public boolean bulkCreateOrReplace(Map documents) { + if (documents == null || documents.isEmpty()) { + return true; + } + + String tableName = tableIdentifier.getTableName(); + String pkColumn = getPKForTable(tableName); + String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn); + PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn); + + try { + // Parse all documents and collect the union of all columns from documents + Map parsedDocuments = new LinkedHashMap<>(); + Set docColumns = new LinkedHashSet<>(); + docColumns.add(quotedPkColumn); + + List ignoredDocuments = new ArrayList<>(); + for (Map.Entry entry : documents.entrySet()) { + List skippedFields = new ArrayList<>(); + TypedDocument parsed = parseDocument(entry.getValue(), tableName, skippedFields); + + // Handle IGNORE_DOCUMENT strategy: skip docs with unknown fields + if (missingColumnStrategy == MissingColumnStrategy.IGNORE_DOCUMENT + && !skippedFields.isEmpty()) { + ignoredDocuments.add(entry.getKey()); + continue; + } + + parsed.add(quotedPkColumn, entry.getKey().toString(), pkType, false); + parsedDocuments.put(entry.getKey(), parsed); + docColumns.addAll(parsed.getColumns()); + } + + if (!ignoredDocuments.isEmpty()) { + LOGGER.info( + "bulkCreateOrReplace: Ignored {} documents due to IGNORE_DOCUMENT strategy. Keys: {}", + ignoredDocuments.size(), + ignoredDocuments); + } + + // If all documents were ignored, return true (nothing to do) + if (parsedDocuments.isEmpty()) { + return true; + } + + // Get all table columns for the replace semantics (missing cols set to DEFAULT) + List allTableColumns = + schemaRegistry.getSchema(tableName).values().stream() + .map(PostgresColumnMetadata::getName) + .map(PostgresUtils::wrapFieldNamesWithDoubleQuotes) + .collect(Collectors.toList()); + + // Build the bulk createOrReplace SQL - uses all table columns with DEFAULT for missing + List docColumnList = new ArrayList<>(docColumns); + String sql = buildCreateOrReplaceSql(allTableColumns, docColumnList, quotedPkColumn); + LOGGER.debug("Bulk createOrReplace SQL: {}", sql); + + try (Connection conn = client.getPooledConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + + for (Map.Entry entry : parsedDocuments.entrySet()) { + TypedDocument parsed = entry.getValue(); + int index = 1; + + for (String column : docColumnList) { + if (parsed.getColumns().contains(column)) { + setParameter( + conn, + ps, + index++, + parsed.getValue(column), + parsed.getType(column), + parsed.isArray(column)); + } else { + ps.setObject(index++, null); + } + } + ps.addBatch(); + } + + int[] results = ps.executeBatch(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Bulk createOrReplace results: {}", Arrays.toString(results)); + } + return true; + } + + } catch (BatchUpdateException e) { + LOGGER.error("BatchUpdateException in bulkCreateOrReplace", e); + } catch (SQLException e) { + LOGGER.error( + "SQLException in bulkCreateOrReplace. SQLState: {} Error Code: {}", + e.getSQLState(), + e.getErrorCode(), + e); + } catch (IOException e) { + LOGGER.error("IOException in bulkCreateOrReplace. documents: {}", documents, e); + } + + return false; + } + /** * Builds a PostgreSQL upsert SQL statement with merge semantics. *