From a13e30897eafa69b53588374885eab47c46a7099 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Mon, 18 Aug 2025 17:30:39 -0400 Subject: [PATCH 01/20] Adding a way to avoid barrier calls when at least 1 response in quorum read flow has 410-1022. --- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 7 ++- .../cosmos/implementation/Exceptions.java | 5 ++ .../directconnectivity/QuorumReader.java | 54 ++++++++++++++++--- .../directconnectivity/StoreReader.java | 19 ++++++- .../directconnectivity/StoreResult.java | 2 + 5 files changed, 76 insertions(+), 11 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index 6869aeda23e6..260c568ec78b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -155,13 +155,11 @@ public void beforeClass() { this.clientWithPreferredRegions = getClientBuilder() .preferredRegions(this.preferredRegions) - .consistencyLevel(ConsistencyLevel.SESSION) .endpointDiscoveryEnabled(true) .multipleWriteRegionsEnabled(true) .buildAsyncClient(); this.clientWithoutPreferredRegions = getClientBuilder() - .consistencyLevel(ConsistencyLevel.SESSION) .endpointDiscoveryEnabled(true) .multipleWriteRegionsEnabled(true) .buildAsyncClient(); @@ -476,7 +474,7 @@ public void dataPlaneRequestHttpTimeout( } } - @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "leaseNotFoundArgProvider", timeOut = TIMEOUT) + @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "leaseNotFoundArgProvider"/*, timeOut = TIMEOUT*/) public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( OperationType operationType, FaultInjectionOperationType faultInjectionOperationType, @@ -513,16 +511,17 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( new FaultInjectionConditionBuilder() .operationType(faultInjectionOperationType) .connectionType(FaultInjectionConnectionType.DIRECT) + .region(this.preferredRegions.get(0)) .build()) .result( FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.LEASE_NOT_FOUND) .build() ) .duration(Duration.ofMinutes(5)) - .hitLimit(1) .build(); CosmosAsyncClient testClient = getClientBuilder() + .consistencyLevel(ConsistencyLevel.STRONG) .preferredRegions(shouldUsePreferredRegionsOnClient ? this.preferredRegions : Collections.emptyList()) .directMode() .buildAsyncClient(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java index a77d84d9d7f2..049603aa6f26 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java @@ -65,4 +65,9 @@ public static boolean isStaledResourceException(int statusCode, int subStatusCod || (statusCode == HttpConstants.StatusCodes.GONE && subStatusCode == HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE); } + + public static boolean isAvoidQuorumSelectionException(CosmosException cosmosException) { + return Exceptions.isStatusCode(cosmosException, HttpConstants.StatusCodes.GONE) + && Exceptions.isSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.LEASE_NOT_FOUND); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java index f43e3bcb02a7..f704cfe20943 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java @@ -30,6 +30,7 @@ import java.time.Duration; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -153,8 +154,13 @@ public Mono readStrongAsync( secondaryQuorumReadResult -> { switch (secondaryQuorumReadResult.quorumResult) { + case QuorumNotPossibleInCurrentRegion: + logger.info("QuorumNotPossibleInCurrentRegion: ReadQuorumResult StoreResponses: {}", + String.join(";", secondaryQuorumReadResult.storeResponses)); + return Flux.error(secondaryQuorumReadResult.failFastException); case QuorumMet: try { + logger.warn("QuorumMet: ReadQuorumResult StoreResponses: {}", String.join(";", secondaryQuorumReadResult.storeResponses)); return Flux.just(secondaryQuorumReadResult.getResponse()); } catch (CosmosException e) { return Flux.error(e); @@ -302,6 +308,7 @@ private Mono readQuorumAsync( return ensureQuorumSelectedStoreResponse(entity, readQuorum, includePrimary, readMode).flatMap( res -> { + if (res.getLeft() != null) { // no need for barrier return Mono.just(res.getKey()); @@ -326,7 +333,8 @@ private Mono readQuorumAsync( readLsn, globalCommittedLSN, storeResult, - storeResponses)); + storeResponses, + null)); } return Mono.just(new ReadQuorumResult( @@ -335,7 +343,8 @@ private Mono readQuorumAsync( readLsn, globalCommittedLSN, storeResult, - storeResponses)); + storeResponses, + null)); } ); } @@ -368,11 +377,37 @@ private Mono firstStoreResultWithIsAvoidQuorumSelectionException = responseResult + .stream() + .filter(response -> response.isAvoidQuorumSelectionException) + .findFirst(); + + if (firstStoreResultWithIsAvoidQuorumSelectionException.isPresent()) { + + StoreResult storeResult = firstStoreResultWithIsAvoidQuorumSelectionException.get(); + String message = String.format( + "At least one replica returned an exception because of which quorum cannot be selected in region for partitonId. Responses: %s", + String.join(";", storeResponses)); + + logger.warn(message); + + return Mono.just(Pair.of(new ReadQuorumResult( + entity.requestContext.requestChargeTracker, + ReadQuorumResultKind.QuorumNotPossibleInCurrentRegion, + -1, + -1, + null, + storeResponses, + storeResult.getException()), null)); + } + int responseCount = (int) responseResult.stream().filter(response -> response.isValid).count(); + if (responseCount < readQuorum) { return Mono.just(Pair.of(new ReadQuorumResult(entity.requestContext.requestChargeTracker, ReadQuorumResultKind.QuorumNotSelected, - -1, -1, null, storeResponses), null)); + -1, -1, null, storeResponses, null), null)); } //either request overrides consistency level with strong, or request does not @@ -404,7 +439,8 @@ private Mono storeResponses) { + List storeResponses, + CosmosException failFastException) { super(requestChargeTracker, selectedResponse); this.quorumResult = QuorumResult; @@ -826,6 +864,7 @@ public ReadQuorumResult( this.globalCommittedSelectedLsn = globalCommittedSelectedLsn; this.selectedResponse = selectedResponse; this.storeResponses = storeResponses; + this.failFastException = failFastException; } public final ReadQuorumResultKind quorumResult; @@ -844,6 +883,8 @@ public ReadQuorumResult( public final long globalCommittedSelectedLsn; + public final CosmosException failFastException; + protected boolean isValidResult() { return this.quorumResult == ReadQuorumResultKind.QuorumMet || this.quorumResult == ReadQuorumResultKind.QuorumSelected; } @@ -868,5 +909,6 @@ private enum PrimaryReadOutcome { QuorumNotMet, // Primary LSN is not committed. QuorumInconclusive, // Secondary replicas are available. Must read R secondary's to deduce current quorum. QuorumMet, + QuorumNotPossibleInCurrentRegion } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java index 000c683b056c..9926665df6ba 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java @@ -278,6 +278,23 @@ private Flux> readFromReplicas(List resultCollect return Mono.error(e); }).map(newStoreResults -> { for (StoreResult srr : newStoreResults) { + + if (srr.isAvoidQuorumSelectionException) { + // isAvoidQuorumSelectionException is a special case where we want to prevent the enclosing data plane operation + // to fail fast in the region where a quorum selection is being attempted + if (logger.isDebugEnabled()) { + logger.debug("AvoidQuorumSelectionException encountered, returning result immediately: {}", srr); + } + + if (!entity.requestContext.performedBackgroundAddressRefresh) { + this.startBackgroundAddressRefresh(entity); + entity.requestContext.performedBackgroundAddressRefresh = true; + } + + resultCollector.add(srr); + return resultCollector; + } + if (srr.isValid) { try { @@ -1000,7 +1017,7 @@ StoreResult createStoreResult(StoreResponse storeResponse, /* currentReplicaSetSize: */ currentReplicaSetSize, /* currentWriteQuorum: */ currentWriteQuorum, /* isValid: */!requiresValidLsn - || ((cosmosException.getStatusCode() != HttpConstants.StatusCodes.GONE || isSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE) || isSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.LEASE_NOT_FOUND)) + || ((cosmosException.getStatusCode() != HttpConstants.StatusCodes.GONE || isSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE)) && lsn >= 0), // TODO: verify where exception.RequestURI is supposed to be set in .Net /* storePhysicalAddress: */ storePhysicalAddress == null diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java index 2a39a3d07497..88531ae7828e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java @@ -36,6 +36,7 @@ public class StoreResult { final public boolean isGoneException; final public boolean isNotFoundException; final public boolean isInvalidPartitionException; + final public boolean isAvoidQuorumSelectionException; final public Uri storePhysicalAddress; final public boolean isThroughputControlRequestRateTooLargeException; final public Double backendLatencyInMs; @@ -83,6 +84,7 @@ public StoreResult( this.isThroughputControlRequestRateTooLargeException = this.exception != null && Exceptions.isThroughputControlRequestRateTooLargeException(this.exception); this.backendLatencyInMs = backendLatencyInMs; this.retryAfterInMs = retryAfterInMs; + this.isAvoidQuorumSelectionException = this.exception != null && Exceptions.isAvoidQuorumSelectionException(this.exception); } public StoreResponse getStoreResponse() { From d0fcc913d9bbcb1f5ade97e0b3bf642cecee4426 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Mon, 18 Aug 2025 18:27:32 -0400 Subject: [PATCH 02/20] Adding a way to avoid barrier calls when at least 1 response in quorum read flow has 410-1022. --- .../directconnectivity/QuorumReader.java | 12 +++++------- .../directconnectivity/StoreReader.java | 10 +++++++--- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java index f704cfe20943..eddde1c7fcf0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java @@ -155,7 +155,7 @@ public Mono readStrongAsync( switch (secondaryQuorumReadResult.quorumResult) { case QuorumNotPossibleInCurrentRegion: - logger.info("QuorumNotPossibleInCurrentRegion: ReadQuorumResult StoreResponses: {}", + logger.warn("QuorumNotPossibleInCurrentRegion: ReadQuorumResult StoreResponses: {}", String.join(";", secondaryQuorumReadResult.storeResponses)); return Flux.error(secondaryQuorumReadResult.failFastException); case QuorumMet: @@ -384,13 +384,11 @@ private Mono> readFromReplicas(List resultCollect for (StoreResult srr : newStoreResults) { if (srr.isAvoidQuorumSelectionException) { - // isAvoidQuorumSelectionException is a special case where we want to prevent the enclosing data plane operation + // isAvoidQuorumSelectionException is a special case where we want to enable the enclosing data plane operation // to fail fast in the region where a quorum selection is being attempted + // no attempts to reselect quorum will be made if (logger.isDebugEnabled()) { - logger.debug("AvoidQuorumSelectionException encountered, returning result immediately: {}", srr); + logger.debug(" {}", srr); } if (!entity.requestContext.performedBackgroundAddressRefresh) { @@ -292,7 +293,10 @@ private Flux> readFromReplicas(List resultCollect } resultCollector.add(srr); - return resultCollector; + + // continue to the next store result (collect quorum store results if possible) + // to not reattempt quorum selection + continue; } if (srr.isValid) { From c76ef6c105c36ee9bb77ccce9ef949d1251a74fd Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Mon, 18 Aug 2025 18:42:34 -0400 Subject: [PATCH 03/20] Adding a way to avoid barrier calls when at least 1 response in quorum read flow has 410-1022. --- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 39 ++++++++++--------- .../directconnectivity/QuorumReader.java | 3 +- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index 260c568ec78b..a10d82802df4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -96,25 +96,25 @@ public static Object[][] leaseNotFoundArgProvider() { return new Object[][]{ // OperationType, FaultInjectionOperationType, shouldUsePreferredRegionsOnClient, isReadMany { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false }, - { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false }, - { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false }, - { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false }, - { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false }, - { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false }, - { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false }, - { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true }, - { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false }, - { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false }, - { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false }, - { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false }, - { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false }, - { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false }, - { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false }, - { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true } +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false }, +// { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false }, +// { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false }, +// { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false }, +// { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false }, +// { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false }, +// { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false }, +// { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false }, +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true }, +// { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false }, +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false }, +// { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false }, +// { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false }, +// { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false }, +// { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false }, +// { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false }, +// { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false }, +// { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false }, +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true } }; } @@ -518,6 +518,7 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( .build() ) .duration(Duration.ofMinutes(5)) + .hitLimit(1) .build(); CosmosAsyncClient testClient = getClientBuilder() diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java index eddde1c7fcf0..9cec1fd34d8c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java @@ -160,7 +160,6 @@ public Mono readStrongAsync( return Flux.error(secondaryQuorumReadResult.failFastException); case QuorumMet: try { - logger.warn("QuorumMet: ReadQuorumResult StoreResponses: {}", String.join(";", secondaryQuorumReadResult.storeResponses)); return Flux.just(secondaryQuorumReadResult.getResponse()); } catch (CosmosException e) { return Flux.error(e); @@ -388,7 +387,7 @@ private Mono Date: Mon, 18 Aug 2025 19:03:04 -0400 Subject: [PATCH 04/20] Adding a way to avoid barrier calls when at least 1 response in quorum read flow has 410-1022. --- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 40 +++++++++---------- .../directconnectivity/QuorumReader.java | 4 +- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index a10d82802df4..49a86d1ee616 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -96,25 +96,25 @@ public static Object[][] leaseNotFoundArgProvider() { return new Object[][]{ // OperationType, FaultInjectionOperationType, shouldUsePreferredRegionsOnClient, isReadMany { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false }, -// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false }, -// { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false }, -// { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false }, -// { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false }, -// { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false }, -// { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false }, -// { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false }, -// { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false }, -// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true }, -// { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false }, -// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false }, -// { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false }, -// { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false }, -// { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false }, -// { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false }, -// { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false }, -// { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false }, -// { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false }, -// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true } + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false }, + { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false }, + { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false }, + { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false }, + { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false }, + { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false }, + { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false }, + { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true }, + { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false }, + { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false }, + { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false }, + { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false }, + { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false }, + { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false }, + { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false }, + { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true } }; } @@ -474,7 +474,7 @@ public void dataPlaneRequestHttpTimeout( } } - @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "leaseNotFoundArgProvider"/*, timeOut = TIMEOUT*/) + @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "leaseNotFoundArgProvider", timeOut = TIMEOUT) public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( OperationType operationType, FaultInjectionOperationType faultInjectionOperationType, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java index 9cec1fd34d8c..beaa44c302e6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java @@ -384,8 +384,8 @@ private Mono Date: Mon, 18 Aug 2025 19:13:07 -0400 Subject: [PATCH 05/20] Adding a way to avoid barrier calls when at least 1 response in quorum read flow has 410-1022. --- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 65 ++++++++++++------- 1 file changed, 43 insertions(+), 22 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index 49a86d1ee616..b6228741d867 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -95,26 +95,46 @@ public static Object[][] channelAcquisitionExceptionArgProvider() { public static Object[][] leaseNotFoundArgProvider() { return new Object[][]{ // OperationType, FaultInjectionOperationType, shouldUsePreferredRegionsOnClient, isReadMany - { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false }, - { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false }, - { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false }, - { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false }, - { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false }, - { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false }, - { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false }, - { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true }, - { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false }, - { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false }, - { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false }, - { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false }, - { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false }, - { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false }, - { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false }, - { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true } + { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false, 1 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false, 1 }, + { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false, 1 }, + { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false, 1 }, + { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false, 1 }, + { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false, 1 }, + { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false, 1 }, + { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false, 1 }, + { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false, 1 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true, 1 }, + { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false, 1 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false, 1 }, + { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false, 1 }, + { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false, 1 }, + { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false, 1 }, + { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false, 1 }, + { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false, 1 }, + { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false, 1 }, + { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false, 1 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true, 1 }, + { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false, 2 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false, 2 }, + { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false, 2 }, + { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false, 2 }, + { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false, 2 }, + { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false, 2 }, + { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false, 2 }, + { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false, 2 }, + { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false, 2 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true, 2 }, + { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false, 2 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false, 2 }, + { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false, 2 }, + { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false, 2 }, + { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false, 2 }, + { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false, 2 }, + { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false, 2 }, + { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false, 2 }, + { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false, 2 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true, 2 } }; } @@ -479,7 +499,8 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( OperationType operationType, FaultInjectionOperationType faultInjectionOperationType, boolean shouldUsePreferredRegionsOnClient, - boolean isReadMany) { + boolean isReadMany, + int hitLimit) { boolean shouldRetryCrossRegion = false; @@ -518,7 +539,7 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( .build() ) .duration(Duration.ofMinutes(5)) - .hitLimit(1) + .hitLimit(hitLimit) .build(); CosmosAsyncClient testClient = getClientBuilder() From 907e3b05568e31c37a49b5bdb652c48b7bd4cc74 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Tue, 19 Aug 2025 16:22:49 -0400 Subject: [PATCH 06/20] Adding a way to avoid barrier calls when at least 1 response in quorum read flow has 410-1022. --- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 116 +++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index b6228741d867..7b9ad18f8ea2 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -543,7 +543,6 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( .build(); CosmosAsyncClient testClient = getClientBuilder() - .consistencyLevel(ConsistencyLevel.STRONG) .preferredRegions(shouldUsePreferredRegionsOnClient ? this.preferredRegions : Collections.emptyList()) .directMode() .buildAsyncClient(); @@ -593,6 +592,121 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( } } + @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "leaseNotFoundArgProvider", timeOut = TIMEOUT) + public void dataPlaneRequestHitsLeaseNotFoundAndGoneInFirstPreferredRegion( + OperationType operationType, + FaultInjectionOperationType faultInjectionOperationType, + boolean shouldUsePreferredRegionsOnClient, + boolean isReadMany, + int hitLimit) { + + boolean shouldRetryCrossRegion = false; + + if (Utils.isWriteOperation(operationType) && this.serviceOrderedWriteableRegions.size() > 1) { + shouldRetryCrossRegion = true; + } else if (!Utils.isWriteOperation(operationType) && this.serviceOrderedReadableRegions.size() > 1) { + shouldRetryCrossRegion = true; + } + + CosmosAsyncClient resultantCosmosAsyncClient; + + if (shouldUsePreferredRegionsOnClient) { + resultantCosmosAsyncClient = this.clientWithPreferredRegions; + } else { + resultantCosmosAsyncClient = this.clientWithoutPreferredRegions; + } + + if (BridgeInternal + .getContextClient(resultantCosmosAsyncClient) + .getConnectionPolicy() + .getConnectionMode() == ConnectionMode.GATEWAY) { + throw new SkipException("leaseNotFound is only meant for Direct mode"); + } + + TestItem createdItem = TestItem.createNewItem(); + + FaultInjectionRule leaseNotFoundFaultRule = new FaultInjectionRuleBuilder("leaseNotFound-" + UUID.randomUUID()) + .condition( + new FaultInjectionConditionBuilder() + .operationType(faultInjectionOperationType) + .connectionType(FaultInjectionConnectionType.DIRECT) + .region(this.preferredRegions.get(0)) + .build()) + .result( + FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.LEASE_NOT_FOUND) + .build() + ) + .duration(Duration.ofMinutes(5)) + .hitLimit(1) + .build(); + + FaultInjectionRule goneRule = new FaultInjectionRuleBuilder("gone-" + UUID.randomUUID()) + .condition( + new FaultInjectionConditionBuilder() + .operationType(faultInjectionOperationType) + .connectionType(FaultInjectionConnectionType.DIRECT) + .region(this.preferredRegions.get(0)) + .build()) + .result( + FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.GONE) + .build() + ) + .duration(Duration.ofMinutes(5)) + .hitLimit(1) + .build(); + + CosmosAsyncClient testClient = getClientBuilder() + .preferredRegions(shouldUsePreferredRegionsOnClient ? this.preferredRegions : Collections.emptyList()) + .directMode() + .buildAsyncClient(); + + CosmosAsyncContainer testContainer = getSharedSinglePartitionCosmosContainer(testClient); + + try { + + testContainer.createItem(createdItem).block(); + + CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(leaseNotFoundFaultRule, goneRule)).block(); + + Instant timeStart = Instant.now(); + + CosmosDiagnostics cosmosDiagnostics + = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany).block(); + + Instant timeEnd = Instant.now(); + + if (shouldRetryCrossRegion) { + assertThat(cosmosDiagnostics).isNotNull(); + assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull(); + + CosmosDiagnosticsContext diagnosticsContext = cosmosDiagnostics.getDiagnosticsContext(); + + assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(2); + assertThat(diagnosticsContext.getStatusCode()).isLessThan(HttpConstants.StatusCodes.BADREQUEST); + } else { + assertThat(cosmosDiagnostics).isNotNull(); + assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull(); + + CosmosDiagnosticsContext diagnosticsContext = cosmosDiagnostics.getDiagnosticsContext(); + + assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(1); + assertThat(diagnosticsContext.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.SERVICE_UNAVAILABLE); + assertThat(diagnosticsContext.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.LEASE_NOT_FOUND); + } + + assertThat(Duration.between(timeStart, timeEnd)).isLessThan(Duration.ofSeconds(5)); + + } finally { + leaseNotFoundFaultRule.disable(); + + if (testClient != null) { + cleanUpContainer(testContainer); + testClient.close(); + } + } + } + + @Test(groups = { "multi-master" }, dataProvider = "channelAcquisitionExceptionArgProvider", timeOut = 8 * TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class) public void channelAcquisitionExceptionOnWrites( OperationType operationType, From 85bed422f01e60c1cd370bd9397eb7b13aa858c2 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Tue, 19 Aug 2025 17:34:31 -0400 Subject: [PATCH 07/20] Adding a way to avoid barrier calls when at least 1 response in quorum read flow has 410-1022. --- .../test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index 7b9ad18f8ea2..f6b0bdc73a87 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -706,7 +706,6 @@ public void dataPlaneRequestHitsLeaseNotFoundAndGoneInFirstPreferredRegion( } } - @Test(groups = { "multi-master" }, dataProvider = "channelAcquisitionExceptionArgProvider", timeOut = 8 * TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class) public void channelAcquisitionExceptionOnWrites( OperationType operationType, From 03e27d8348ee02ee3124f07de4273a7669355002 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Tue, 19 Aug 2025 17:44:13 -0400 Subject: [PATCH 08/20] Adding a way to avoid barrier calls when at least 1 response in quorum read flow has 410-1022. --- .../implementation/directconnectivity/StoreReader.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java index d35d7ff8dd2f..18fa1bffff0c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java @@ -284,7 +284,13 @@ private Flux> readFromReplicas(List resultCollect // to fail fast in the region where a quorum selection is being attempted // no attempts to reselect quorum will be made if (logger.isDebugEnabled()) { - logger.debug(" {}", srr); + + int statusCode, subStatusCode; + + statusCode = srr.getException() != null ? srr.getException().getStatusCode() : 0; + subStatusCode = srr.getException() != null ? srr.getException().getSubStatusCode() : 0; + + logger.debug("An exception with error code [{}-{}] was observed which means quorum cannot be attained in the current region!", statusCode, subStatusCode); } if (!entity.requestContext.performedBackgroundAddressRefresh) { From ad928cc0439f4c3bb258fdbfe4fe24d7f1e9546c Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Tue, 19 Aug 2025 19:41:28 -0400 Subject: [PATCH 09/20] Modifying tests. --- .../directconnectivity/StoreReaderTest.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java index 710946c2325c..27052ab0d7a5 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java @@ -7,11 +7,13 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.DiagnosticsClientContext; import com.azure.cosmos.implementation.DocumentServiceRequestContext; +import com.azure.cosmos.implementation.Exceptions; import com.azure.cosmos.implementation.FailureValidator; import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ISessionContainer; import com.azure.cosmos.implementation.ISessionToken; +import com.azure.cosmos.implementation.LeaseNotFoundException; import com.azure.cosmos.implementation.NotFoundException; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionIsMigratingException; @@ -149,6 +151,7 @@ public Object[][] storeResponseArgProvider() { { new PartitionKeyRangeIsSplittingException() , null, }, { new PartitionIsMigratingException(), null, }, { new GoneException(), null, }, + { new LeaseNotFoundException(null, 0L, null, null), null }, { null, Mockito.mock(StoreResponse.class), } }; } @@ -915,9 +918,15 @@ public void storeResponseRecordedOnException(Exception ex, StoreResponse storeRe try { StoreReader.verifyCanContinueOnException((CosmosException) ex); - // for continuable exception, SDK will retry on all other replicas, so the failed endpoints should match replica counts. - List expectedFailedEndpoints = Arrays.asList(primaryUri, secondaryUri1, secondaryUri2, secondaryUri3); - assertThat(dsr.requestContext.getFailedEndpoints()).hasSize(expectedFailedEndpoints.size()).containsAll(expectedFailedEndpoints); + if (Exceptions.isAvoidQuorumSelectionException((CosmosException) ex)) { + // while the exception is continuable, it is avoid quorum selection exception, so such results are collected + // while these results are not valid, they are still collected in the failed endpoints and also contribute towards decrementing replicaCountToRead to avoid quorum reselection. + assertThat(dsr.requestContext.getFailedEndpoints().size()).isEqualTo(3); + } else { + // for continuable exception, SDK will retry on all other replicas, so the failed endpoints should match replica counts. + List expectedFailedEndpoints = Arrays.asList(primaryUri, secondaryUri1, secondaryUri2, secondaryUri3); + assertThat(dsr.requestContext.getFailedEndpoints()).hasSize(expectedFailedEndpoints.size()).containsAll(expectedFailedEndpoints); + } } catch (Exception exception) { if (exception instanceof CosmosException) { From cbf50873cc4171746d5fb2bcce2290f890439889 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Tue, 26 Aug 2025 18:42:32 -0400 Subject: [PATCH 10/20] Scope out barrier requests from the fail fast flow (for the time being). --- .../RxDocumentServiceRequest.java | 1 + .../BarrierRequestHelper.java | 2 + .../directconnectivity/StoreReader.java | 39 +++++++++++-------- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index b1619d048681..c1d2de9c8457 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -85,6 +85,7 @@ public class RxDocumentServiceRequest implements Cloneable { public volatile Map properties; public String throughputControlGroupName; public volatile boolean intendedCollectionRidPassedIntoSDK = false; + public volatile boolean isBarrierRequest = false; private volatile Duration responseTimeout; private volatile boolean nonIdempotentWriteRetriesEnabled = false; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierRequestHelper.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierRequestHelper.java index 7f6b2e497c5d..a39546e485cf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierRequestHelper.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierRequestHelper.java @@ -142,6 +142,8 @@ public static Mono createAsync( barrierLsnRequest.getHeaders().put(WFConstants.BackendHeaders.COLLECTION_RID, request.getHeaders().get(WFConstants.BackendHeaders.COLLECTION_RID)); } + barrierLsnRequest.isBarrierRequest = true; + if (hasAadToken) { return authorizationTokenProvider.populateAuthorizationHeader(barrierLsnRequest); } else { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java index 18fa1bffff0c..9c605f5b233b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java @@ -280,29 +280,34 @@ private Flux> readFromReplicas(List resultCollect for (StoreResult srr : newStoreResults) { if (srr.isAvoidQuorumSelectionException) { - // isAvoidQuorumSelectionException is a special case where we want to enable the enclosing data plane operation - // to fail fast in the region where a quorum selection is being attempted - // no attempts to reselect quorum will be made - if (logger.isDebugEnabled()) { - int statusCode, subStatusCode; + // todo: fail fast when barrier requests also hit isAvoidQuorumSelectionException? + if (!entity.isBarrierRequest) { - statusCode = srr.getException() != null ? srr.getException().getStatusCode() : 0; - subStatusCode = srr.getException() != null ? srr.getException().getSubStatusCode() : 0; + // isAvoidQuorumSelectionException is a special case where we want to enable the enclosing data plane operation + // to fail fast in the region where a quorum selection is being attempted + // no attempts to reselect quorum will be made + if (logger.isDebugEnabled()) { - logger.debug("An exception with error code [{}-{}] was observed which means quorum cannot be attained in the current region!", statusCode, subStatusCode); - } + int statusCode, subStatusCode; - if (!entity.requestContext.performedBackgroundAddressRefresh) { - this.startBackgroundAddressRefresh(entity); - entity.requestContext.performedBackgroundAddressRefresh = true; - } + statusCode = srr.getException() != null ? srr.getException().getStatusCode() : 0; + subStatusCode = srr.getException() != null ? srr.getException().getSubStatusCode() : 0; - resultCollector.add(srr); + logger.debug("An exception with error code [{}-{}] was observed which means quorum cannot be attained in the current region!", statusCode, subStatusCode); + } + + if (!entity.requestContext.performedBackgroundAddressRefresh) { + this.startBackgroundAddressRefresh(entity); + entity.requestContext.performedBackgroundAddressRefresh = true; + } - // continue to the next store result (collect quorum store results if possible) - // to not reattempt quorum selection - continue; + resultCollector.add(srr); + + // continue to the next store result (collect quorum store results if possible) + // to not reattempt quorum selection + continue; + } } if (srr.isValid) { From 9cb31e977bcbe64c1b31527ea8c79d81bdbd785b Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Fri, 29 Aug 2025 15:49:15 -0400 Subject: [PATCH 11/20] Fixing tests --- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 72 +++++++++++++++---- .../directconnectivity/QuorumReader.java | 11 ++- .../directconnectivity/StoreReader.java | 5 +- 3 files changed, 68 insertions(+), 20 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index f6b0bdc73a87..6349515d10a4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -14,13 +14,17 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.FlakyTestRetryAnalyzer; +import com.azure.cosmos.ReadConsistencyStrategy; import com.azure.cosmos.implementation.AsyncDocumentClient; +import com.azure.cosmos.implementation.ClientSideRequestStatistics; import com.azure.cosmos.implementation.DatabaseAccount; import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics; import com.azure.cosmos.implementation.throughputControl.TestItem; import com.azure.cosmos.models.CosmosBatch; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; @@ -34,7 +38,6 @@ import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; -import com.azure.cosmos.test.faultinjection.FaultInjectionEndpointBuilder; import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; import com.azure.cosmos.test.faultinjection.FaultInjectionRule; @@ -65,6 +68,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -72,6 +76,10 @@ import static org.testng.Assert.fail; public class ClientRetryPolicyE2ETests extends TestSuiteBase { + + private ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor cosmosDiagnosticsAccessor = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + private CosmosAsyncClient clientWithPreferredRegions; private CosmosAsyncContainer cosmosAsyncContainerFromClientWithPreferredRegions; private CosmosAsyncClient clientWithoutPreferredRegions; @@ -545,7 +553,10 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( CosmosAsyncClient testClient = getClientBuilder() .preferredRegions(shouldUsePreferredRegionsOnClient ? this.preferredRegions : Collections.emptyList()) .directMode() + // required to force a quorum read irrespective of account consistency level + .readConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) .buildAsyncClient(); + CosmosAsyncContainer testContainer = getSharedSinglePartitionCosmosContainer(testClient); try { @@ -554,13 +565,12 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(leaseNotFoundFaultRule)).block(); - Instant timeStart = Instant.now(); + AtomicReference timeStartRef = new AtomicReference<>(); + AtomicReference timeEndRef = new AtomicReference<>(); CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany).block(); - Instant timeEnd = Instant.now(); - if (shouldRetryCrossRegion) { assertThat(cosmosDiagnostics).isNotNull(); assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull(); @@ -580,7 +590,7 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( assertThat(diagnosticsContext.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.LEASE_NOT_FOUND); } - assertThat(Duration.between(timeStart, timeEnd)).isLessThan(Duration.ofSeconds(5)); + assertThat(Duration.between(timeStartRef.get(), timeEndRef.get())).isLessThan(Duration.ofSeconds(5)); } finally { leaseNotFoundFaultRule.disable(); @@ -592,8 +602,10 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( } } - @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "leaseNotFoundArgProvider", timeOut = TIMEOUT) - public void dataPlaneRequestHitsLeaseNotFoundAndGoneInFirstPreferredRegion( + @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "leaseNotFoundArgProvider"/*, timeOut = TIMEOUT*/) + // Inject 410-1022 and 429-3200 into the 2 replicas participating in quorum read + // Validate that the client fails fast in the first preferred region and retries in the next region if possible (in a window <<60s) + public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRegion( OperationType operationType, FaultInjectionOperationType faultInjectionOperationType, boolean shouldUsePreferredRegionsOnClient, @@ -640,7 +652,7 @@ public void dataPlaneRequestHitsLeaseNotFoundAndGoneInFirstPreferredRegion( .hitLimit(1) .build(); - FaultInjectionRule goneRule = new FaultInjectionRuleBuilder("gone-" + UUID.randomUUID()) + FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder("too-many-requests-" + UUID.randomUUID()) .condition( new FaultInjectionConditionBuilder() .operationType(faultInjectionOperationType) @@ -648,7 +660,7 @@ public void dataPlaneRequestHitsLeaseNotFoundAndGoneInFirstPreferredRegion( .region(this.preferredRegions.get(0)) .build()) .result( - FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.GONE) + FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) .build() ) .duration(Duration.ofMinutes(5)) @@ -657,6 +669,8 @@ public void dataPlaneRequestHitsLeaseNotFoundAndGoneInFirstPreferredRegion( CosmosAsyncClient testClient = getClientBuilder() .preferredRegions(shouldUsePreferredRegionsOnClient ? this.preferredRegions : Collections.emptyList()) + // required to force a quorum read irrespective of account consistency level + .readConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) .directMode() .buildAsyncClient(); @@ -666,14 +680,16 @@ public void dataPlaneRequestHitsLeaseNotFoundAndGoneInFirstPreferredRegion( testContainer.createItem(createdItem).block(); - CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(leaseNotFoundFaultRule, goneRule)).block(); + CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(tooManyRequestsRule, leaseNotFoundFaultRule)).block(); - Instant timeStart = Instant.now(); + AtomicReference timeStartRef = new AtomicReference<>(); + AtomicReference timeEndRef = new AtomicReference<>(); CosmosDiagnostics cosmosDiagnostics - = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany).block(); - - Instant timeEnd = Instant.now(); + = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany) + .doOnSubscribe(ignore -> timeStartRef.set(Instant.now())) + .doFinally(signalType -> timeEndRef.set(Instant.now())) + .block(); if (shouldRetryCrossRegion) { assertThat(cosmosDiagnostics).isNotNull(); @@ -683,6 +699,10 @@ public void dataPlaneRequestHitsLeaseNotFoundAndGoneInFirstPreferredRegion( assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(2); assertThat(diagnosticsContext.getStatusCode()).isLessThan(HttpConstants.StatusCodes.BADREQUEST); + + if (operationType.isReadOnlyOperation()) { + validateCosmosDiagnosticsForMultiErrorCodesInQuorumRead(diagnosticsContext, Arrays.asList(410, 429)); + } } else { assertThat(cosmosDiagnostics).isNotNull(); assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull(); @@ -692,9 +712,13 @@ public void dataPlaneRequestHitsLeaseNotFoundAndGoneInFirstPreferredRegion( assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(1); assertThat(diagnosticsContext.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.SERVICE_UNAVAILABLE); assertThat(diagnosticsContext.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.LEASE_NOT_FOUND); + + if (operationType.isReadOnlyOperation()) { + validateCosmosDiagnosticsForMultiErrorCodesInQuorumRead(diagnosticsContext, Arrays.asList(410, 429)); + } } - assertThat(Duration.between(timeStart, timeEnd)).isLessThan(Duration.ofSeconds(5)); + assertThat(Duration.between(timeStartRef.get(), timeEndRef.get())).isLessThan(Duration.ofSeconds(5)); } finally { leaseNotFoundFaultRule.disable(); @@ -986,6 +1010,24 @@ private Mono performDocumentOperation( } } + private void validateCosmosDiagnosticsForMultiErrorCodesInQuorumRead( + CosmosDiagnosticsContext cosmosDiagnosticsContext, + List expectedStatusCodes) { + + assertThat(cosmosDiagnosticsContext).isNotNull(); + + List actualStatusCodes = cosmosDiagnosticsContext.getDiagnostics() + .stream() + .map(diagnostics -> cosmosDiagnosticsAccessor.getClientSideRequestStatistics(diagnostics)) + .flatMap(clientSideRequestStatisticsCollection -> clientSideRequestStatisticsCollection.stream().map(ClientSideRequestStatistics::getResponseStatisticsList)) + .map(storeResponseStatisticsCollection -> storeResponseStatisticsCollection.stream().map(storeResponseStatistics -> storeResponseStatistics.getStoreResult().getStoreResponseDiagnostics()).collect(Collectors.toCollection(ArrayList::new))) + .map(storeResponseDiagnosticsCollection -> storeResponseDiagnosticsCollection.stream().map(StoreResponseDiagnostics::getStatusCode).collect(Collectors.toCollection(ArrayList::new))) + .flatMap(ArrayList::stream) + .collect(Collectors.toCollection(ArrayList::new)); + + assertThat(actualStatusCodes).containsAll(expectedStatusCodes); + } + private AccountLevelLocationContext getAccountLevelLocationContext(DatabaseAccount databaseAccount, boolean writeOnly) { Iterator locationIterator = writeOnly ? databaseAccount.getWritableLocations().iterator() : databaseAccount.getReadableLocations().iterator(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java index beaa44c302e6..710c3d933155 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java @@ -12,6 +12,7 @@ import com.azure.cosmos.implementation.Exceptions; import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalServerErrorException; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.IAuthorizationTokenProvider; @@ -155,9 +156,13 @@ public Mono readStrongAsync( switch (secondaryQuorumReadResult.quorumResult) { case QuorumNotPossibleInCurrentRegion: - logger.warn("QuorumNotPossibleInCurrentRegion: ReadQuorumResult StoreResponses: {}", - String.join(";", secondaryQuorumReadResult.storeResponses)); - return Flux.error(secondaryQuorumReadResult.failFastException); + try { + logger.warn("QuorumNotPossibleInCurrentRegion: ReadQuorumResult StoreResponses: {}", + String.join(";", secondaryQuorumReadResult.storeResponses)); + return Flux.error(secondaryQuorumReadResult.failFastException); + } catch (CosmosException e) { + return Flux.error(e); + } case QuorumMet: try { return Flux.just(secondaryQuorumReadResult.getResponse()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java index 9c605f5b233b..d1a8f6473807 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java @@ -302,10 +302,11 @@ private Flux> readFromReplicas(List resultCollect entity.requestContext.performedBackgroundAddressRefresh = true; } + // (collect quorum store results if possible) + // for QuorumReader (upstream) to make the final decision on quorum selection resultCollector.add(srr); - // continue to the next store result (collect quorum store results if possible) - // to not reattempt quorum selection + // continue to the next store result continue; } } From 50a1de62a9274b4b93ce849def989a0d56efe58f Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Fri, 29 Aug 2025 16:12:09 -0400 Subject: [PATCH 12/20] Adding replicaId and error code logs. --- .../com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java | 6 +++++- .../directconnectivity/QuorumReader.java | 10 +++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index 6349515d10a4..0d119b0058eb 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -569,7 +569,10 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( AtomicReference timeEndRef = new AtomicReference<>(); CosmosDiagnostics cosmosDiagnostics - = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany).block(); + = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany) + .doOnSubscribe(ignored -> timeStartRef.set(Instant.now())) + .doFinally(signal -> timeEndRef.set(Instant.now())) + .block(); if (shouldRetryCrossRegion) { assertThat(cosmosDiagnostics).isNotNull(); @@ -722,6 +725,7 @@ public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRe } finally { leaseNotFoundFaultRule.disable(); + tooManyRequestsRule.disable(); if (testClient != null) { cleanUpContainer(testContainer); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java index 710c3d933155..79816eb08775 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java @@ -369,8 +369,11 @@ private Mono { StoreResponse storeResponse = response.getStoreResponse(); - if (storeResponse == null) { - return response.storePhysicalAddress + " -> n/a"; + if (storeResponse == null && response.getException() != null) { + + CosmosException cosmosException = response.getException(); + + return response.storePhysicalAddress + " -> " + "(" + cosmosException.getStatusCode() + "-" + cosmosException.getSubStatusCode() + ")"; } return response.storePhysicalAddress @@ -389,9 +392,10 @@ private Mono Date: Fri, 29 Aug 2025 18:05:32 -0400 Subject: [PATCH 13/20] Adding replicaId and error code logs. --- .../GoneAndRetryWithRetryPolicyTest.java | 30 +++++++++++++++++++ .../GoneAndRetryWithRetryPolicy.java | 2 +- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java index db66181e7619..7c5c9be002bd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java @@ -9,6 +9,7 @@ import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.InvalidPartitionException; +import com.azure.cosmos.implementation.LeaseNotFoundException; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionIsMigratingException; import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException; @@ -76,6 +77,35 @@ public void shouldRetryReadWithGoneException() { assertThat(shouldRetryResult.backOffTime.getSeconds()).isEqualTo(4); } + @Test(groups = { "unit" }, timeOut = TIMEOUT) + public void shouldNotRetryReadWithLeaseNotFoundException() { + RxDocumentServiceRequest request = RxDocumentServiceRequest.create( + mockDiagnosticsClientContext(), + OperationType.Read, + ResourceType.Document); + GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); + Mono singleShouldRetry = goneAndRetryWithRetryPolicy + .shouldRetry(new LeaseNotFoundException("0", null)); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + assertThat(shouldRetryResult.shouldRetry).isFalse(); + assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(1); + assertThat(shouldRetryResult.backOffTime).isNull(); + + request = RxDocumentServiceRequest.create( + mockDiagnosticsClientContext(), + OperationType.Create, + ResourceType.Document); + goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); + singleShouldRetry = goneAndRetryWithRetryPolicy + .shouldRetry(new LeaseNotFoundException("0", null)); + shouldRetryResult = singleShouldRetry.block(); + assertThat(shouldRetryResult.shouldRetry).isFalse(); + assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(1); + assertThat(shouldRetryResult.backOffTime).isNull(); + } + /** * Retry with GoneException for write which is not yet sent to the wire, * retried 4 times and verified the returned diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java index 336b0241ae96..04ba424975e3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java @@ -231,7 +231,7 @@ public Mono shouldRetry(Exception exception) { exception); return Mono.just(ShouldRetryResult.noRetry(exceptionToThrow, - Quadruple.with(true, false, Duration.ofMillis(0), this.attemptCount.get()))); + Quadruple.with(true, true, Duration.ofMillis(0), this.attemptCount.get()))); } long remainingSeconds = this.waitTimeInSeconds - From 0fc8c5e64097c706bf475741e1691cfcd2390cc2 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Fri, 29 Aug 2025 18:43:23 -0400 Subject: [PATCH 14/20] Adding replicaId and error code logs. --- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 80 +++++++++---------- .../directconnectivity/QuorumReader.java | 3 +- 2 files changed, 41 insertions(+), 42 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index 0d119b0058eb..36ac90a0a407 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -104,45 +104,45 @@ public static Object[][] leaseNotFoundArgProvider() { return new Object[][]{ // OperationType, FaultInjectionOperationType, shouldUsePreferredRegionsOnClient, isReadMany { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false, 1 }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false, 1 }, - { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false, 1 }, - { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false, 1 }, - { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false, 1 }, - { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false, 1 }, - { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false, 1 }, - { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false, 1 }, - { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false, 1 }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true, 1 }, - { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false, 1 }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false, 1 }, - { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false, 1 }, - { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false, 1 }, - { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false, 1 }, - { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false, 1 }, - { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false, 1 }, - { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false, 1 }, - { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false, 1 }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true, 1 }, - { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false, 2 }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false, 2 }, - { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false, 2 }, - { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false, 2 }, - { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false, 2 }, - { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false, 2 }, - { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false, 2 }, - { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false, 2 }, - { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false, 2 }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true, 2 }, - { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false, 2 }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false, 2 }, - { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false, 2 }, - { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false, 2 }, - { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false, 2 }, - { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false, 2 }, - { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false, 2 }, - { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false, 2 }, - { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false, 2 }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true, 2 } +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false, 1 }, +// { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false, 1 }, +// { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false, 1 }, +// { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false, 1 }, +// { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false, 1 }, +// { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false, 1 }, +// { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false, 1 }, +// { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false, 1 }, +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true, 1 }, +// { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false, 1 }, +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false, 1 }, +// { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false, 1 }, +// { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false, 1 }, +// { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false, 1 }, +// { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false, 1 }, +// { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false, 1 }, +// { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false, 1 }, +// { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false, 1 }, +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true, 1 }, +// { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false, 2 }, +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false, 2 }, +// { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false, 2 }, +// { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false, 2 }, +// { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false, 2 }, +// { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false, 2 }, +// { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false, 2 }, +// { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false, 2 }, +// { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false, 2 }, +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true, 2 }, +// { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false, 2 }, +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false, 2 }, +// { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false, 2 }, +// { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false, 2 }, +// { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false, 2 }, +// { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false, 2 }, +// { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false, 2 }, +// { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false, 2 }, +// { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false, 2 }, +// { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true, 2 } }; } @@ -605,7 +605,7 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( } } - @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "leaseNotFoundArgProvider"/*, timeOut = TIMEOUT*/) + @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "leaseNotFoundArgProvider", timeOut = TIMEOUT) // Inject 410-1022 and 429-3200 into the 2 replicas participating in quorum read // Validate that the client fails fast in the first preferred region and retries in the next region if possible (in a window <<60s) public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRegion( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java index 79816eb08775..86bd30f8bad3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java @@ -914,7 +914,6 @@ protected boolean isValidResult() { private enum PrimaryReadOutcome { QuorumNotMet, // Primary LSN is not committed. QuorumInconclusive, // Secondary replicas are available. Must read R secondary's to deduce current quorum. - QuorumMet, - QuorumNotPossibleInCurrentRegion + QuorumMet } } From 3b643ee81bd6294a935830b7e05393810caf0292 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Fri, 29 Aug 2025 19:07:14 -0400 Subject: [PATCH 15/20] Adding replicaId and error code logs. --- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index 1dd1c032495e..f4aed3ebdc04 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -70,6 +70,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -77,6 +78,10 @@ import static org.testng.Assert.fail; public class ClientRetryPolicyE2ETests extends TestSuiteBase { + + private static final ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor cosmosDiagnosticsAccessor = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + private CosmosAsyncClient clientWithPreferredRegions; private CosmosAsyncContainer cosmosAsyncContainerFromClientWithPreferredRegions; private CosmosAsyncClient clientWithoutPreferredRegions; @@ -219,7 +224,7 @@ public static Object[] preferredRegionsConfigProvider() { @Test(groups = { "multi-master" }, dataProvider = "preferredRegionsConfigProvider", timeOut = TIMEOUT) public void queryPlanHttpTimeoutWillNotMarkRegionUnavailable(boolean shouldUsePreferredRegionsOnClient) { - TestObject newItem = TestObject.create(); + TestItem newItem = TestItem.createNewItem(); CosmosAsyncContainer resultantCosmosAsyncContainer; CosmosAsyncClient resultantCosmosAsyncClient; @@ -262,7 +267,7 @@ public void queryPlanHttpTimeoutWillNotMarkRegionUnavailable(boolean shouldUsePr try { // validate the query plan will be retried in a different region and the final requests will be succeeded // TODO: Also capture all retries for metadata requests in the diagnostics - FeedResponse firstPage = cosmosAsyncContainerFromClientWithPreferredRegions.queryItems(query, queryRequestOptions, TestObject.class) + FeedResponse firstPage = cosmosAsyncContainerFromClientWithPreferredRegions.queryItems(query, queryRequestOptions, TestItem.class) .byPage() .blockFirst(); @@ -297,7 +302,7 @@ public void addressRefreshHttpTimeoutWillDoCrossRegionRetryForReads(boolean shou throw new SkipException("queryPlanHttpTimeoutWillNotMarkRegionUnavailable() is only meant for DIRECT mode"); } - TestObject newItem = TestObject.create(); + TestItem newItem = TestItem.createNewItem(); resultantCosmosAsyncContainer.createItem(newItem).block(); // create fault injection rules for address refresh @@ -332,8 +337,8 @@ public void addressRefreshHttpTimeoutWillDoCrossRegionRetryForReads(boolean shou resultantCosmosAsyncContainer, Arrays.asList(addressRefreshDelayRule, serverGoneRule)).block(); try { - CosmosItemResponse itemResponse = resultantCosmosAsyncContainer - .readItem(newItem.getId(), new PartitionKey(newItem.getId()), TestObject.class) + CosmosItemResponse itemResponse = resultantCosmosAsyncContainer + .readItem(newItem.getId(), new PartitionKey(newItem.getId()), TestItem.class) .block(); assertThat(itemResponse).isNotNull(); @@ -402,7 +407,7 @@ public void addressRefreshHttpTimeoutWillNotDoCrossRegionRetryForWrites(boolean resultantCosmosAsyncContainer, Arrays.asList(addressRefreshDelayRule, serverGoneRule)).block(); try { - TestObject newItem = TestObject.create(); + TestItem newItem = TestItem.createNewItem(); resultantCosmosAsyncContainer.createItem(newItem).block(); } catch (CosmosException e) { assertThat(e.getDiagnostics().getContactedRegionNames().size()).isEqualTo(1); @@ -440,7 +445,7 @@ public void dataPlaneRequestHttpTimeout( throw new SkipException("queryPlanHttpTimeoutWillNotMarkRegionUnavailable() is only meant for GATEWAY mode"); } - TestObject newItem = TestObject.create(); + TestItem newItem = TestItem.createNewItem(); resultantCosmosAsyncContainer.createItem(newItem).block(); FaultInjectionRule requestHttpTimeoutRule = new FaultInjectionRuleBuilder("requestHttpTimeoutRule" + UUID.randomUUID()) .condition( @@ -530,7 +535,7 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( throw new SkipException("leaseNotFound is only meant for Direct mode"); } - TestObject createdItem = TestObject.create(); + TestItem createdItem = TestItem.createNewItem(); FaultInjectionRule leaseNotFoundFaultRule = new FaultInjectionRuleBuilder("leaseNotFound-" + UUID.randomUUID()) .condition( From b3444c9426cf9b71daa323a46d29e6bae50c7408 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Tue, 2 Sep 2025 15:55:03 -0400 Subject: [PATCH 16/20] Specify work item for barrier handling on 410-1022s --- .../cosmos/implementation/directconnectivity/StoreReader.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java index d1a8f6473807..0f30496d3b97 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java @@ -282,6 +282,7 @@ private Flux> readFromReplicas(List resultCollect if (srr.isAvoidQuorumSelectionException) { // todo: fail fast when barrier requests also hit isAvoidQuorumSelectionException? + // todo: https://github.com/Azure/azure-sdk-for-java/issues/46135 if (!entity.isBarrierRequest) { // isAvoidQuorumSelectionException is a special case where we want to enable the enclosing data plane operation From 03bba5b142f726c9f095a7e6b8686aea4dfcd029 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Tue, 2 Sep 2025 17:02:22 -0400 Subject: [PATCH 17/20] Addressing review comments. --- .../com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java | 8 ++++++-- .../implementation/directconnectivity/QuorumReader.java | 4 ++-- .../implementation/directconnectivity/StoreReader.java | 6 ++---- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index f4aed3ebdc04..cb4b20dc3352 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -923,7 +923,9 @@ private Mono performDocumentOperation( } if (operationType == OperationType.Create) { - return cosmosAsyncContainer.createItem(TestItem.createNewItem()).map(itemResponse -> itemResponse.getDiagnostics()) .onErrorResume(throwable -> { + return cosmosAsyncContainer.createItem(TestItem.createNewItem()) + .map(itemResponse -> itemResponse.getDiagnostics()) + .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { CosmosException cosmosException = (CosmosException) throwable; @@ -934,7 +936,9 @@ private Mono performDocumentOperation( } if (operationType == OperationType.Upsert) { - return cosmosAsyncContainer.upsertItem(TestItem.createNewItem()).map(itemResponse -> itemResponse.getDiagnostics()) .onErrorResume(throwable -> { + return cosmosAsyncContainer.upsertItem(TestItem.createNewItem()) + .map(itemResponse -> itemResponse.getDiagnostics()) + .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { CosmosException cosmosException = (CosmosException) throwable; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java index 86bd30f8bad3..1f01e66076d6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java @@ -373,7 +373,7 @@ private Mono " + "(" + cosmosException.getStatusCode() + "-" + cosmosException.getSubStatusCode() + ")"; + return response.storePhysicalAddress + " -> " + "(" + cosmosException.getStatusCode() + "-" + cosmosException.getSubStatusCode() + ")"; } return response.storePhysicalAddress @@ -914,6 +914,6 @@ protected boolean isValidResult() { private enum PrimaryReadOutcome { QuorumNotMet, // Primary LSN is not committed. QuorumInconclusive, // Secondary replicas are available. Must read R secondary's to deduce current quorum. - QuorumMet + QuorumMet, } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java index 0f30496d3b97..f336b457c0ef 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java @@ -290,10 +290,8 @@ private Flux> readFromReplicas(List resultCollect // no attempts to reselect quorum will be made if (logger.isDebugEnabled()) { - int statusCode, subStatusCode; - - statusCode = srr.getException() != null ? srr.getException().getStatusCode() : 0; - subStatusCode = srr.getException() != null ? srr.getException().getSubStatusCode() : 0; + int statusCode = srr.getException() != null ? srr.getException().getStatusCode() : 0; + int subStatusCode = srr.getException() != null ? srr.getException().getSubStatusCode() : 0; logger.debug("An exception with error code [{}-{}] was observed which means quorum cannot be attained in the current region!", statusCode, subStatusCode); } From 3b9518af7eaeae0654ef95c235daafa8f3f524ce Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Tue, 2 Sep 2025 17:04:50 -0400 Subject: [PATCH 18/20] Updated CHANGELOG.md --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 468efce9aab1..0af79740875b 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -11,6 +11,7 @@ * Fixed 404/1002 for query when container recreated with same name. - [PR 45930](https://github.com/Azure/azure-sdk-for-java/pull/45930) * Fixed Null Pointer Exception for query when container recreated with same name. - [PR 45930](https://github.com/Azure/azure-sdk-for-java/pull/45930) * Fixed Null Pointer Exception for readMany when container recreated with same name. - [PR 45930](https://github.com/Azure/azure-sdk-for-java/pull/45930) +* Fixed Strong Consistency violation when a single replica in a partition returns a 410 `Lease Not Found`. - [PR 46433](https://github.com/Azure/azure-sdk-for-java/pull/46433) #### Other Changes * Added change to optimize lease checkpointing in `ChangeFeedProcessor` by conditionally executing checkpoint operations for 304 responses based on continuation token comparison, which helps to reduce RU consumption for unchanged feeds. See [PR 46521](https://github.com/Azure/azure-sdk-for-java/pull/46521) From 970ee2de8effaedd7302c35bd22738f47ecb846f Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Wed, 3 Sep 2025 23:27:19 -0400 Subject: [PATCH 19/20] Addressing review comments. --- .../cosmos/rx/ClientRetryPolicyE2ETests.java | 62 +++++++------------ 1 file changed, 24 insertions(+), 38 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index cb4b20dc3352..8d4e7fe2570b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -26,7 +26,6 @@ import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics; -import com.azure.cosmos.implementation.throughputControl.TestItem; import com.azure.cosmos.models.CosmosBatch; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; import com.azure.cosmos.models.CosmosItemIdentity; @@ -39,7 +38,6 @@ import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; -import com.azure.cosmos.test.faultinjection.FaultInjectionEndpointBuilder; import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; import com.azure.cosmos.test.faultinjection.FaultInjectionRule; @@ -104,7 +102,7 @@ public static Object[][] channelAcquisitionExceptionArgProvider() { @DataProvider(name = "leaseNotFoundArgProvider") public static Object[][] leaseNotFoundArgProvider() { return new Object[][]{ - // OperationType, FaultInjectionOperationType, shouldUsePreferredRegionsOnClient, isReadMany + // OperationType, FaultInjectionOperationType, shouldUsePreferredRegionsOnClient, isReadMany, hitLimit (1 or 2) for lease not found { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false, 1 }, { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false, 1 }, { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false, 1 }, @@ -224,7 +222,7 @@ public static Object[] preferredRegionsConfigProvider() { @Test(groups = { "multi-master" }, dataProvider = "preferredRegionsConfigProvider", timeOut = TIMEOUT) public void queryPlanHttpTimeoutWillNotMarkRegionUnavailable(boolean shouldUsePreferredRegionsOnClient) { - TestItem newItem = TestItem.createNewItem(); + TestObject newItem = TestObject.create(); CosmosAsyncContainer resultantCosmosAsyncContainer; CosmosAsyncClient resultantCosmosAsyncClient; @@ -267,7 +265,7 @@ public void queryPlanHttpTimeoutWillNotMarkRegionUnavailable(boolean shouldUsePr try { // validate the query plan will be retried in a different region and the final requests will be succeeded // TODO: Also capture all retries for metadata requests in the diagnostics - FeedResponse firstPage = cosmosAsyncContainerFromClientWithPreferredRegions.queryItems(query, queryRequestOptions, TestItem.class) + FeedResponse firstPage = cosmosAsyncContainerFromClientWithPreferredRegions.queryItems(query, queryRequestOptions, TestObject.class) .byPage() .blockFirst(); @@ -302,7 +300,7 @@ public void addressRefreshHttpTimeoutWillDoCrossRegionRetryForReads(boolean shou throw new SkipException("queryPlanHttpTimeoutWillNotMarkRegionUnavailable() is only meant for DIRECT mode"); } - TestItem newItem = TestItem.createNewItem(); + TestObject newItem = TestObject.create(); resultantCosmosAsyncContainer.createItem(newItem).block(); // create fault injection rules for address refresh @@ -337,8 +335,8 @@ public void addressRefreshHttpTimeoutWillDoCrossRegionRetryForReads(boolean shou resultantCosmosAsyncContainer, Arrays.asList(addressRefreshDelayRule, serverGoneRule)).block(); try { - CosmosItemResponse itemResponse = resultantCosmosAsyncContainer - .readItem(newItem.getId(), new PartitionKey(newItem.getId()), TestItem.class) + CosmosItemResponse itemResponse = resultantCosmosAsyncContainer + .readItem(newItem.getId(), new PartitionKey(newItem.getId()), TestObject.class) .block(); assertThat(itemResponse).isNotNull(); @@ -407,7 +405,7 @@ public void addressRefreshHttpTimeoutWillNotDoCrossRegionRetryForWrites(boolean resultantCosmosAsyncContainer, Arrays.asList(addressRefreshDelayRule, serverGoneRule)).block(); try { - TestItem newItem = TestItem.createNewItem(); + TestObject newItem = TestObject.create(); resultantCosmosAsyncContainer.createItem(newItem).block(); } catch (CosmosException e) { assertThat(e.getDiagnostics().getContactedRegionNames().size()).isEqualTo(1); @@ -445,7 +443,7 @@ public void dataPlaneRequestHttpTimeout( throw new SkipException("queryPlanHttpTimeoutWillNotMarkRegionUnavailable() is only meant for GATEWAY mode"); } - TestItem newItem = TestItem.createNewItem(); + TestObject newItem = TestObject.create(); resultantCosmosAsyncContainer.createItem(newItem).block(); FaultInjectionRule requestHttpTimeoutRule = new FaultInjectionRuleBuilder("requestHttpTimeoutRule" + UUID.randomUUID()) .condition( @@ -535,7 +533,7 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( throw new SkipException("leaseNotFound is only meant for Direct mode"); } - TestItem createdItem = TestItem.createNewItem(); + TestObject createdItem = TestObject.create(); FaultInjectionRule leaseNotFoundFaultRule = new FaultInjectionRuleBuilder("leaseNotFound-" + UUID.randomUUID()) .condition( @@ -567,13 +565,8 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(leaseNotFoundFaultRule)).block(); - AtomicReference timeStartRef = new AtomicReference<>(); - AtomicReference timeEndRef = new AtomicReference<>(); - CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany) - .doOnSubscribe(ignored -> timeStartRef.set(Instant.now())) - .doFinally(signal -> timeEndRef.set(Instant.now())) .block(); if (shouldRetryCrossRegion) { @@ -584,6 +577,7 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(2); assertThat(diagnosticsContext.getStatusCode()).isLessThan(HttpConstants.StatusCodes.BADREQUEST); + assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(5)); } else { assertThat(cosmosDiagnostics).isNotNull(); assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull(); @@ -593,10 +587,9 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(1); assertThat(diagnosticsContext.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.SERVICE_UNAVAILABLE); assertThat(diagnosticsContext.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.LEASE_NOT_FOUND); + assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(5)); } - assertThat(Duration.between(timeStartRef.get(), timeEndRef.get())).isLessThan(Duration.ofSeconds(5)); - } finally { leaseNotFoundFaultRule.disable(); @@ -640,7 +633,7 @@ public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRe throw new SkipException("leaseNotFound is only meant for Direct mode"); } - TestItem createdItem = TestItem.createNewItem(); + TestObject createdItem = TestObject.create(); FaultInjectionRule leaseNotFoundFaultRule = new FaultInjectionRuleBuilder("leaseNotFound-" + UUID.randomUUID()) .condition( @@ -687,13 +680,8 @@ public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRe CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(tooManyRequestsRule, leaseNotFoundFaultRule)).block(); - AtomicReference timeStartRef = new AtomicReference<>(); - AtomicReference timeEndRef = new AtomicReference<>(); - CosmosDiagnostics cosmosDiagnostics = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany) - .doOnSubscribe(ignore -> timeStartRef.set(Instant.now())) - .doFinally(signalType -> timeEndRef.set(Instant.now())) .block(); if (shouldRetryCrossRegion) { @@ -723,8 +711,6 @@ public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRe } } - assertThat(Duration.between(timeStartRef.get(), timeEndRef.get())).isLessThan(Duration.ofSeconds(5)); - } finally { leaseNotFoundFaultRule.disable(); tooManyRequestsRule.disable(); @@ -783,7 +769,7 @@ public void channelAcquisitionExceptionOnWrites( CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(channelAcquisitionExceptionRule)).block(); try { - TestItem createdItem = TestItem.createNewItem(); + TestObject createdItem = TestObject.create(); testContainer.createItem(createdItem).block(); // using a higher concurrency to force channelAcquisitionException to happen @@ -847,16 +833,16 @@ private boolean isChannelAcquisitionExceptionTriggeredRegionRetryExists(String c private Mono performDocumentOperation( CosmosAsyncContainer cosmosAsyncContainer, OperationType operationType, - TestItem createdItem, - Function extractPartitionKeyFunc, + TestObject createdItem, + Function extractPartitionKeyFunc, boolean isReadMany) { try { if (operationType == OperationType.Query && isReadMany) { CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions(); String query = String.format("SELECT * from c where c.id = '%s'", createdItem.getId()); - FeedResponse itemFeedResponse = - cosmosAsyncContainer.queryItems(query, queryRequestOptions, TestItem.class).byPage().blockFirst(); + FeedResponse itemFeedResponse = + cosmosAsyncContainer.queryItems(query, queryRequestOptions, TestObject.class).byPage().blockFirst(); return Mono.just(itemFeedResponse.getCosmosDiagnostics()) .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { @@ -881,7 +867,7 @@ private Mono performDocumentOperation( .readItem( createdItem.getId(), extractPartitionKeyFunc.apply(createdItem), - TestItem.class + TestObject.class ) .map(itemResponse -> itemResponse.getDiagnostics()) .onErrorResume(throwable -> { @@ -923,7 +909,7 @@ private Mono performDocumentOperation( } if (operationType == OperationType.Create) { - return cosmosAsyncContainer.createItem(TestItem.createNewItem()) + return cosmosAsyncContainer.createItem(TestObject.create()) .map(itemResponse -> itemResponse.getDiagnostics()) .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { @@ -936,7 +922,7 @@ private Mono performDocumentOperation( } if (operationType == OperationType.Upsert) { - return cosmosAsyncContainer.upsertItem(TestItem.createNewItem()) + return cosmosAsyncContainer.upsertItem(TestObject.create()) .map(itemResponse -> itemResponse.getDiagnostics()) .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { @@ -956,7 +942,7 @@ private Mono performDocumentOperation( createdItem.getId(), extractPartitionKeyFunc.apply(createdItem), patchOperations, - TestItem.class) + TestObject.class) .map(itemResponse -> itemResponse.getDiagnostics()) .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { @@ -991,8 +977,8 @@ private Mono performDocumentOperation( CosmosChangeFeedRequestOptions changeFeedRequestOptions = CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(feedRanges.get(0)); - FeedResponse firstPage = cosmosAsyncContainer - .queryChangeFeed(changeFeedRequestOptions, TestItem.class) + FeedResponse firstPage = cosmosAsyncContainer + .queryChangeFeed(changeFeedRequestOptions, TestObject.class) .byPage() .blockFirst(); return Mono.just(firstPage.getCosmosDiagnostics()); @@ -1002,7 +988,7 @@ private Mono performDocumentOperation( return cosmosAsyncContainer .readMany( Arrays.asList(new CosmosItemIdentity(extractPartitionKeyFunc.apply(createdItem), createdItem.getId()), new CosmosItemIdentity(extractPartitionKeyFunc.apply(createdItem), createdItem.getId())), - TestItem.class) + TestObject.class) .map(itemResponse -> itemResponse.getCosmosDiagnostics()) .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { From e736302cba53d65c049d94292bcd463c69c221a2 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Wed, 3 Sep 2025 23:28:56 -0400 Subject: [PATCH 20/20] Addressing review comments. --- .../cosmos/implementation/directconnectivity/StoreReader.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java index f336b457c0ef..5169dfb121a7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java @@ -305,6 +305,9 @@ private Flux> readFromReplicas(List resultCollect // for QuorumReader (upstream) to make the final decision on quorum selection resultCollector.add(srr); + // Remaining replicas + replicasToRead.set(replicaCountToRead - resultCollector.size()); + // continue to the next store result continue; }