Extend TransactionalBulkWriter with Additional Write Strategies#48422
Extend TransactionalBulkWriter with Additional Write Strategies#48422
Conversation
There was a problem hiding this comment.
Pull request overview
This PR extends the Cosmos Spark connector’s transactional bulk support by adding more write strategies beyond pure upsert, and introduces a marker-document mechanism to disambiguate ambiguous retry outcomes for transactional batches. It also adjusts patch immutability checks and expands test coverage (unit + integration/e2e).
Changes:
- Add transactional support for additional write strategies (append/create, delete, conditional delete/replace, patch) and enhance retry/ignore handling.
- Introduce per-batch marker documents (with TTL + best-effort cleanup) to verify commit vs rollback on ambiguous retries.
- Update/expand integration and E2E tests to cover the newly supported strategies and marker behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala | Adds multi-strategy transactional batch construction, marker-based commit verification, recovery changes, and retry/ignore logic updates. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosWriterBase.scala | Passes PartitionKeyDefinition into TransactionalBulkWriter for marker construction/patch behavior. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosPatchHelper.scala | Changes PK-path immutability check to use exact match rather than substring match. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala | Adds config for marker TTL and relaxes transactional write-strategy validation. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala | Updates integration tests to assert transactional acceptance for ItemAppend/Delete/OverwriteIfNotModified. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/SparkE2ETransactionalBulkWriterITest.scala | Adds E2E transactional tests per strategy + atomicity/error cases + marker cleanup verification. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBulkWriterSpec.scala | Adds a large unit-test-style spec covering strategy mapping, retry/ignore patterns, marker patterns, and PK keying behavior assumptions. |
...mos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
Outdated
Show resolved
Hide resolved
...mos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
Show resolved
Hide resolved
...mos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
Outdated
Show resolved
Hide resolved
...mos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
Outdated
Show resolved
Hide resolved
...mos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
Outdated
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
Outdated
Show resolved
Hide resolved
...azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBulkWriterSpec.scala
Outdated
Show resolved
Hide resolved
| log.logError(s"Partition key value '$partitionKeyString' has already been scheduled in this writer instance. " + | ||
| s"This indicates a bug in the data distribution or ordering pipeline. " + | ||
| s"Atomicity guarantee may be violated for this partition key value. " + | ||
| s"Context: ${operationContext.toString} $getThreadInfo") |
...mos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
Outdated
Show resolved
Hide resolved
|
@sdkReviewAgent |
|
sdkReviewAgent | Status: ⏳ Queued Review requested by @dibahlfi. I'll start shortly. |
|
sdkReviewAgent | Status: 🔍 Reviewing I'm reviewing this PR now. I'll post my findings as comments when done. |
|
@sdkReviewAgent |
1 similar comment
|
@sdkReviewAgent |
|
sdkReviewAgent | Status: ⏳ Queued Review requested by @xinlian12. I'll start shortly. |
|
sdkReviewAgent | Status: 🔍 Reviewing I'm reviewing this PR now. I'll post my findings as comments when done. |
|
@sdkReviewAgent-2 |
|
The current design which requires to create & delete a marker file in user's container, I think from design perspective, this is a big NO, we should not create & delete items from SDK for user's prod container. This can cause a lot issues I think - like RU spike, what if the id is conflicted with existing real item id (which means we would accidentally delete documents etc), unknown format file could also crash customer's application etc |
|
@sdkReviewAgent-2 |
|
@sdkReviewAgent-2 |
3 similar comments
|
@sdkReviewAgent-2 |
|
@sdkReviewAgent-2 |
|
@sdkReviewAgent-2 |
| return Some(candidateIndices.head -> reconstructionAction.get) | ||
| } | ||
|
|
||
| // Deterministic tie-breakers for exception-only paths when there is no per-item | ||
| // batch response available. This preserves forward progress while keeping strategy | ||
| // semantics stable in practice for transactional E2E recovery scenarios. | ||
| val selectedIndexOpt = itemWriteStrategy match { | ||
| case ItemWriteStrategy.ItemPatchIfExists => | ||
| Some(candidateIndices.last) | ||
|
|
||
| case ItemWriteStrategy.ItemAppend | | ||
| ItemWriteStrategy.ItemDelete | | ||
| ItemWriteStrategy.ItemDeleteIfNotModified | | ||
| ItemWriteStrategy.ItemOverwriteIfNotModified => | ||
| Some(candidateIndices.head) |
There was a problem hiding this comment.
🟡 Fallback Reconstruction Tie-Breaking — deterministic .head may select wrong item
When exception-only responses occur with multiple candidates, getFallbackReconstructionDecision uses .head / .last as tie-breakers (lines 1437, 1451). If the wrong item is selected, it gets reconstructed instead of the actual failing item.
This wastes a retry cycle but doesn't corrupt data (bounded by batch retry count). Consider returning None for ambiguous Remove reconstructions to be more conservative — skip the reconstruction rather than guess.
🤖 This comment was generated by an AI code review agent.
|
|
||
| // Centralizes strategy-specific "original operation" construction so initial-batch creation and | ||
| // reconstructed-batch creation stay behaviorally identical for all supported strategies. | ||
| private def addOriginalOperationToBatch( |
There was a problem hiding this comment.
🟡 ItemBulkUpdate Configuration Gap
The PR removed the assertion blocking non-ItemOverwrite strategies, but addOriginalOperationToBatch still throws for ItemBulkUpdate (this method doesn't have a case for it). This means ItemBulkUpdate will fail at first batch write rather than at config time.
Consider adding config-time validation to fail fast with a clear error message instead of a runtime MatchError.
🤖 This comment was generated by an AI code review agent.
| // Marker Cleanup Verification | ||
| // ===================================================== | ||
|
|
||
| "transactional write marker cleanup" should "not leave marker documents after successful write" in { |
There was a problem hiding this comment.
🟡 Stale Marker Test — PR description documents removed feature
~60% of the PR description documents a "batch marker document" mechanism that was removed in commit b2e9a9731d8. This test still exists but passes trivially since no markers are created by the current implementation.
Please update the PR description to match the actual implementation, and either remove this test or update it to reflect current behavior.
🤖 This comment was generated by an AI code review agent.
|
@sdkReviewAgent-2 |
there is no new comments generated from review agent |
|
/azp run java - cosmos - tests |
|
Azure Pipelines successfully started running 1 pipeline(s). |
Until now, TransactionalBulkWriter in the Cosmos DB Spark Connector only worked with a single write strategy — ItemOverwrite. Every document was sent as an upsert, and if the batch failed, there was nothing nuanced to reconstruct. That simplicity broke down the moment we needed deletes, conditional writes, or patches inside a transactional batch.
This PR extends TransactionalBulkWriter to support full set of write strategies.
A Cosmos transactional batch is all-or-nothing: if any single operation in the batch fails, the entire batch is rolled back and none of the operations take effect. But the failure of one operation doesn't always mean the batch is broken — sometimes it just means the document's state changed between when we read it and when we tried to write it.. A document we tried to create already exists, or a document we tried to delete is already gone. The remaining operations in the batch are still perfectly valid.
Reconstruction is how the writer deals with this. When a batch fails because of one operation's conflict with reality, the writer rebuilds the batch — swapping the offending operation for something harmless (like a read) or dropping it entirely — and resubmits. The rest of the operations get their chance to execute. Without reconstruction, the entire batch would fail permanently even though only one operation was the problem.
Each write strategy now maps to a specific Cosmos transactional batch operation and defines its own reconstruction behavior when a batch partially fails:
Multi-strategy transactional batches:
Each write strategy now maps to a specific Cosmos transactional batch operation and defines its own reconstruction behavior when a batch partially fails:
ItemBulkUpdate is not supported in transactional mode and is now rejected at configuration time (see below).
Strategy examples:
ItemOverwrite — Upsert (fire-and-forget)
Batch contains: [upsert A, upsert B, upsert C]. If the batch fails with a transient error (e.g., 408 or 503), the entire batch is retried as-is — no reconstruction is needed because upsert is idempotent.
ItemAppend — Create, tolerate existing items
Batch contains: [create A, create B, create C]. Document B was already created by a prior attempt or an external process. The batch fails because the operation on B returns 409 Conflict. The other operations return 424 Failed Dependency (not attempted). Reconstruction changes B's operation from createItemOperation to readItemOperation, and the batch is resubmitted as [create A, read B, create C]. The read is a harmless no-op that keeps the batch structurally valid so A and C can be created.
ItemDelete — Delete, tolerate missing items
Batch contains: [delete A, delete B]. Document A has already been deleted. The batch fails because the operation on A returns 404/0 Not Found. Reconstruction removes A from the batch entirely, and the batch is resubmitted as [delete B]. If the reconstructed batch is empty (all items were already gone), it is treated as a trivial success.
ItemDeleteIfNotModified — Conditional delete with ETag
Batch contains: [delete A (ETag: "e1"), delete B (ETag: "e2")]. Document A was modified since we read it — the delete returns 412 Precondition Failed. Reconstruction removes A from the batch (we intentionally skip the delete since the precondition was not met), and the batch is resubmitted as [delete B (ETag: "e2")]. A 404/0 on a conditional delete is handled identically — the item is already gone.
ItemOverwriteIfNotModified — Conditional replace / create hybrid
Items with an ETag are sent as replaceItemOperation with If-Match; items without an ETag are sent as createItemOperation. Batch: [replace A (ETag: "e1"), create B].
• If A returns 412 Precondition Failed (modified since read) -> reconstruct A as Read.
• If B returns 409 Conflict (created externally) -> reconstruct B as Read.
• If A returns 404/0 (deleted between read and write) -> reconstruct A as Remove.
Resubmitted batch after a 412 on A: [read A, create B].
ItemPatch — Partial update (fire-and-forget)
Batch contains: [patch A (set /color = "red"), patch B (increment /count)]. No reconstruction is needed — if the batch fails with a transient error it is retried as-is.
ItemPatchIfExists — Patch only if the document exists
Batch contains: [patch A (set /color = "red"), patch B (set /color = "blue")]. Document A doesn't exist — the operation returns 404/0. Reconstruction removes A from the batch (missing documents are a no-op success), and the batch is resubmitted as [patch B (set /color = "blue")].
Transient errors vs. semantic errors:
Not every batch failure triggers reconstruction. The writer distinguishes between two categories of errors:
Semantic errors (reconstruction-eligible)
These indicate a logical conflict between the operation and the current state of the document. The batch cannot succeed by simply retrying — the offending operation must be modified
Reconstruction does not consume a retry attempt — it fixes the batch shape and resubmits immediately.
Transient errors (retry-eligible)
These indicate an infrastructure or throttling issue that may resolve on its own. The batch is resubmitted unchanged (with its current reconstruction state preserved):
For ItemOverwrite only, 404/0 is also treated as transient (rare race condition with TTL expiration).
Transient retries do consume the retry counter (attemptNumber). If transient retries exhaust maxRetryCount, the writer throws BulkOperationFailedException. Errors that are neither transient nor reconstruction-eligible — for example, a 400 Bad Request caused by a malformed document — fail immediately on the first attempt without retrying.
Decision flow on batch failure
flowchart TD A[Batch fails] A --> B{Per item results available} B -->|Yes| C[Find first non 424 result] C --> D{Reconstruction eligible?} D -->|Yes| E[Reconstruct & resubmit - no retry consumed] D -->|No| H[Fall through to transient retry] B -->|No| F{Exception only with no per item results} F -->|Yes| G{Fallback reconstruction possible?} G -->|Yes| E G -->|No| H H --> I{Transient and retries remaining?} I -->|Yes| J[Resubmit same batch increment attempt] I -->|No| K[BulkOperationFailedException]A batch can go through both paths across retries. For example: first attempt hits a 409 (reconstructed), second attempt of the reconstructed batch hits a 503 (transient retry), third attempt succeeds.
How reconstruction works-
When a transactional batch fails, the writer needs to figure out which operation caused the failure and what to do about it before resubmitting.
There are two paths depending on what the SDK returns:
Path A — Per-item results available (preferred). The CosmosBatchResponse includes individual status codes for each operation. The writer finds the first non-success result (skipping 424 Failed Dependency responses, which are just downstream casualties), maps its status code and strategy to a reconstruction action (Read or Remove), and rebuilds the batch. This path is deterministic and safe.
Path B — Exception-only fallback. In some SDK code paths, the batch returns only a CosmosException with no per-item breakdown. Here the writer has to infer which operation(s) could have caused that status code, factoring in each operation's strategy and whether it carried an ETag. This is inherently less precise — and that's where the safety fix below comes in.
Handling ambiguity in the exception-only fallback-
In the exception-only fallback path (Path B above), the writer knows the status code but not which operation caused it. When multiple operations in the batch could plausibly have produced that status code, the writer has to decide how to proceed.
Consider a batch with two delete operations on the same partition key, both carrying ETags:
op0 = delete document A (ETag: "abc")
op1 = delete document B (ETag: "xyz")
The batch fails with 404/0 (Not Found), but the SDK only gives us the exception — no per-item results. Both op0 and op1 are valid candidates for that status code.
If the writer guessed wrong and reconstructed the wrong operation as Remove (meaning: "this item is already gone, drop it from the batch"), the other operation — the one that actually caused the failure — would silently disappear from all future retries. That's data loss by omission.
To prevent this, the writer distinguishes between destructive and non-destructive reconstruction actions:
Remove (destructive): When multiple candidates match and the reconstruction action is Remove, the writer skips reconstruction entirely and lets the normal retry/failure policy handle it. This avoids the risk of dropping the wrong operation.
Read (non-destructive): When multiple candidates match and the reconstruction action is Read, the writer uses deterministic tie-breaking (picks the first candidate). Guessing wrong here is harmless — it just adds an extra read to the batch without losing any operation.