diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java index db66181e7619..7c5c9be002bd 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java @@ -9,6 +9,7 @@ import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.InvalidPartitionException; +import com.azure.cosmos.implementation.LeaseNotFoundException; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionIsMigratingException; import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException; @@ -76,6 +77,35 @@ public void shouldRetryReadWithGoneException() { assertThat(shouldRetryResult.backOffTime.getSeconds()).isEqualTo(4); } + @Test(groups = { "unit" }, timeOut = TIMEOUT) + public void shouldNotRetryReadWithLeaseNotFoundException() { + RxDocumentServiceRequest request = RxDocumentServiceRequest.create( + mockDiagnosticsClientContext(), + OperationType.Read, + ResourceType.Document); + GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); + Mono singleShouldRetry = goneAndRetryWithRetryPolicy + .shouldRetry(new LeaseNotFoundException("0", null)); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + assertThat(shouldRetryResult.shouldRetry).isFalse(); + assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(1); + assertThat(shouldRetryResult.backOffTime).isNull(); + + request = RxDocumentServiceRequest.create( + mockDiagnosticsClientContext(), + OperationType.Create, + ResourceType.Document); + goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); + singleShouldRetry = goneAndRetryWithRetryPolicy + .shouldRetry(new LeaseNotFoundException("0", null)); + shouldRetryResult = singleShouldRetry.block(); + assertThat(shouldRetryResult.shouldRetry).isFalse(); + assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); + assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(1); + assertThat(shouldRetryResult.backOffTime).isNull(); + } + /** * Retry with GoneException for write which is not yet sent to the wire, * retried 4 times and verified the returned diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java index 710946c2325c..27052ab0d7a5 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreReaderTest.java @@ -7,11 +7,13 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.DiagnosticsClientContext; import com.azure.cosmos.implementation.DocumentServiceRequestContext; +import com.azure.cosmos.implementation.Exceptions; import com.azure.cosmos.implementation.FailureValidator; import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ISessionContainer; import com.azure.cosmos.implementation.ISessionToken; +import com.azure.cosmos.implementation.LeaseNotFoundException; import com.azure.cosmos.implementation.NotFoundException; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionIsMigratingException; @@ -149,6 +151,7 @@ public Object[][] storeResponseArgProvider() { { new PartitionKeyRangeIsSplittingException() , null, }, { new PartitionIsMigratingException(), null, }, { new GoneException(), null, }, + { new LeaseNotFoundException(null, 0L, null, null), null }, { null, Mockito.mock(StoreResponse.class), } }; } @@ -915,9 +918,15 @@ public void storeResponseRecordedOnException(Exception ex, StoreResponse storeRe try { StoreReader.verifyCanContinueOnException((CosmosException) ex); - // for continuable exception, SDK will retry on all other replicas, so the failed endpoints should match replica counts. - List expectedFailedEndpoints = Arrays.asList(primaryUri, secondaryUri1, secondaryUri2, secondaryUri3); - assertThat(dsr.requestContext.getFailedEndpoints()).hasSize(expectedFailedEndpoints.size()).containsAll(expectedFailedEndpoints); + if (Exceptions.isAvoidQuorumSelectionException((CosmosException) ex)) { + // while the exception is continuable, it is avoid quorum selection exception, so such results are collected + // while these results are not valid, they are still collected in the failed endpoints and also contribute towards decrementing replicaCountToRead to avoid quorum reselection. + assertThat(dsr.requestContext.getFailedEndpoints().size()).isEqualTo(3); + } else { + // for continuable exception, SDK will retry on all other replicas, so the failed endpoints should match replica counts. + List expectedFailedEndpoints = Arrays.asList(primaryUri, secondaryUri1, secondaryUri2, secondaryUri3); + assertThat(dsr.requestContext.getFailedEndpoints()).hasSize(expectedFailedEndpoints.size()).containsAll(expectedFailedEndpoints); + } } catch (Exception exception) { if (exception instanceof CosmosException) { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java index 5144d7bbc47d..8d4e7fe2570b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ClientRetryPolicyE2ETests.java @@ -14,14 +14,18 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.DirectConnectionConfig; import com.azure.cosmos.FlakyTestRetryAnalyzer; +import com.azure.cosmos.ReadConsistencyStrategy; import com.azure.cosmos.TestObject; import com.azure.cosmos.implementation.AsyncDocumentClient; +import com.azure.cosmos.implementation.ClientSideRequestStatistics; import com.azure.cosmos.implementation.DatabaseAccount; import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics; import com.azure.cosmos.models.CosmosBatch; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; import com.azure.cosmos.models.CosmosItemIdentity; @@ -34,7 +38,6 @@ import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper; import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder; import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType; -import com.azure.cosmos.test.faultinjection.FaultInjectionEndpointBuilder; import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType; import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders; import com.azure.cosmos.test.faultinjection.FaultInjectionRule; @@ -65,6 +68,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -72,6 +76,10 @@ import static org.testng.Assert.fail; public class ClientRetryPolicyE2ETests extends TestSuiteBase { + + private static final ImplementationBridgeHelpers.CosmosDiagnosticsHelper.CosmosDiagnosticsAccessor cosmosDiagnosticsAccessor = + ImplementationBridgeHelpers.CosmosDiagnosticsHelper.getCosmosDiagnosticsAccessor(); + private CosmosAsyncClient clientWithPreferredRegions; private CosmosAsyncContainer cosmosAsyncContainerFromClientWithPreferredRegions; private CosmosAsyncClient clientWithoutPreferredRegions; @@ -94,27 +102,47 @@ public static Object[][] channelAcquisitionExceptionArgProvider() { @DataProvider(name = "leaseNotFoundArgProvider") public static Object[][] leaseNotFoundArgProvider() { return new Object[][]{ - // OperationType, FaultInjectionOperationType, shouldUsePreferredRegionsOnClient, isReadMany - { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false }, - { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false }, - { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false }, - { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false }, - { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false }, - { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false }, - { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false }, - { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true }, - { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false }, - { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false }, - { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false }, - { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false }, - { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false }, - { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false }, - { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false }, - { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false }, - { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true } + // OperationType, FaultInjectionOperationType, shouldUsePreferredRegionsOnClient, isReadMany, hitLimit (1 or 2) for lease not found + { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false, 1 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false, 1 }, + { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false, 1 }, + { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false, 1 }, + { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false, 1 }, + { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false, 1 }, + { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false, 1 }, + { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false, 1 }, + { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false, 1 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true, 1 }, + { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false, 1 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false, 1 }, + { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false, 1 }, + { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false, 1 }, + { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false, 1 }, + { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false, 1 }, + { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false, 1 }, + { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false, 1 }, + { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false, 1 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true, 1 }, + { OperationType.Read, FaultInjectionOperationType.READ_ITEM, true, false, 2 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, false, 2 }, + { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, true, false, 2 }, + { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, true, false, 2 }, + { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, true, false, 2 }, + { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, true, false, 2 }, + { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, true, false, 2 }, + { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, true, false, 2 }, + { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, true, false, 2 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, true, true, 2 }, + { OperationType.Read, FaultInjectionOperationType.READ_ITEM, false, false, 2 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, false, 2 }, + { OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, false, false, 2 }, + { OperationType.Patch, FaultInjectionOperationType.PATCH_ITEM, false, false, 2 }, + { OperationType.Replace, FaultInjectionOperationType.REPLACE_ITEM, false, false, 2 }, + { OperationType.Delete, FaultInjectionOperationType.DELETE_ITEM, false, false, 2 }, + { OperationType.Upsert, FaultInjectionOperationType.UPSERT_ITEM, false, false, 2 }, + { OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, false, false, 2 }, + { OperationType.Batch, FaultInjectionOperationType.BATCH_ITEM, false, false, 2 }, + { OperationType.Query, FaultInjectionOperationType.QUERY_ITEM, false, true, 2 } }; } @@ -155,13 +183,11 @@ public void beforeClass() { this.clientWithPreferredRegions = getClientBuilder() .preferredRegions(this.preferredRegions) - .consistencyLevel(ConsistencyLevel.SESSION) .endpointDiscoveryEnabled(true) .multipleWriteRegionsEnabled(true) .buildAsyncClient(); this.clientWithoutPreferredRegions = getClientBuilder() - .consistencyLevel(ConsistencyLevel.SESSION) .endpointDiscoveryEnabled(true) .multipleWriteRegionsEnabled(true) .buildAsyncClient(); @@ -481,7 +507,8 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( OperationType operationType, FaultInjectionOperationType faultInjectionOperationType, boolean shouldUsePreferredRegionsOnClient, - boolean isReadMany) { + boolean isReadMany, + int hitLimit) { boolean shouldRetryCrossRegion = false; @@ -513,19 +540,23 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( new FaultInjectionConditionBuilder() .operationType(faultInjectionOperationType) .connectionType(FaultInjectionConnectionType.DIRECT) + .region(this.preferredRegions.get(0)) .build()) .result( FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.LEASE_NOT_FOUND) .build() ) .duration(Duration.ofMinutes(5)) - .hitLimit(1) + .hitLimit(hitLimit) .build(); CosmosAsyncClient testClient = getClientBuilder() .preferredRegions(shouldUsePreferredRegionsOnClient ? this.preferredRegions : Collections.emptyList()) .directMode() + // required to force a quorum read irrespective of account consistency level + .readConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) .buildAsyncClient(); + CosmosAsyncContainer testContainer = getSharedSinglePartitionCosmosContainer(testClient); try { @@ -534,12 +565,9 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(leaseNotFoundFaultRule)).block(); - Instant timeStart = Instant.now(); - CosmosDiagnostics cosmosDiagnostics - = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany).block(); - - Instant timeEnd = Instant.now(); + = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany) + .block(); if (shouldRetryCrossRegion) { assertThat(cosmosDiagnostics).isNotNull(); @@ -549,6 +577,7 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(2); assertThat(diagnosticsContext.getStatusCode()).isLessThan(HttpConstants.StatusCodes.BADREQUEST); + assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(5)); } else { assertThat(cosmosDiagnostics).isNotNull(); assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull(); @@ -558,12 +587,133 @@ public void dataPlaneRequestHitsLeaseNotFoundInFirstPreferredRegion( assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(1); assertThat(diagnosticsContext.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.SERVICE_UNAVAILABLE); assertThat(diagnosticsContext.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.LEASE_NOT_FOUND); + assertThat(diagnosticsContext.getDuration()).isLessThan(Duration.ofSeconds(5)); + } + + } finally { + leaseNotFoundFaultRule.disable(); + + if (testClient != null) { + cleanUpContainer(testContainer); + testClient.close(); } + } + } + + @Test(groups = { "fast", "fi-multi-master", "multi-region" }, dataProvider = "leaseNotFoundArgProvider", timeOut = TIMEOUT) + // Inject 410-1022 and 429-3200 into the 2 replicas participating in quorum read + // Validate that the client fails fast in the first preferred region and retries in the next region if possible (in a window <<60s) + public void dataPlaneRequestHitsLeaseNotFoundAndResourceThrottleFirstPreferredRegion( + OperationType operationType, + FaultInjectionOperationType faultInjectionOperationType, + boolean shouldUsePreferredRegionsOnClient, + boolean isReadMany, + int hitLimit) { + + boolean shouldRetryCrossRegion = false; + + if (Utils.isWriteOperation(operationType) && this.serviceOrderedWriteableRegions.size() > 1) { + shouldRetryCrossRegion = true; + } else if (!Utils.isWriteOperation(operationType) && this.serviceOrderedReadableRegions.size() > 1) { + shouldRetryCrossRegion = true; + } + + CosmosAsyncClient resultantCosmosAsyncClient; + + if (shouldUsePreferredRegionsOnClient) { + resultantCosmosAsyncClient = this.clientWithPreferredRegions; + } else { + resultantCosmosAsyncClient = this.clientWithoutPreferredRegions; + } + + if (BridgeInternal + .getContextClient(resultantCosmosAsyncClient) + .getConnectionPolicy() + .getConnectionMode() == ConnectionMode.GATEWAY) { + throw new SkipException("leaseNotFound is only meant for Direct mode"); + } + + TestObject createdItem = TestObject.create(); + + FaultInjectionRule leaseNotFoundFaultRule = new FaultInjectionRuleBuilder("leaseNotFound-" + UUID.randomUUID()) + .condition( + new FaultInjectionConditionBuilder() + .operationType(faultInjectionOperationType) + .connectionType(FaultInjectionConnectionType.DIRECT) + .region(this.preferredRegions.get(0)) + .build()) + .result( + FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.LEASE_NOT_FOUND) + .build() + ) + .duration(Duration.ofMinutes(5)) + .hitLimit(1) + .build(); + + FaultInjectionRule tooManyRequestsRule = new FaultInjectionRuleBuilder("too-many-requests-" + UUID.randomUUID()) + .condition( + new FaultInjectionConditionBuilder() + .operationType(faultInjectionOperationType) + .connectionType(FaultInjectionConnectionType.DIRECT) + .region(this.preferredRegions.get(0)) + .build()) + .result( + FaultInjectionResultBuilders.getResultBuilder(FaultInjectionServerErrorType.TOO_MANY_REQUEST) + .build() + ) + .duration(Duration.ofMinutes(5)) + .hitLimit(1) + .build(); - assertThat(Duration.between(timeStart, timeEnd)).isLessThan(Duration.ofSeconds(5)); + CosmosAsyncClient testClient = getClientBuilder() + .preferredRegions(shouldUsePreferredRegionsOnClient ? this.preferredRegions : Collections.emptyList()) + // required to force a quorum read irrespective of account consistency level + .readConsistencyStrategy(ReadConsistencyStrategy.LATEST_COMMITTED) + .directMode() + .buildAsyncClient(); + + CosmosAsyncContainer testContainer = getSharedSinglePartitionCosmosContainer(testClient); + + try { + + testContainer.createItem(createdItem).block(); + + CosmosFaultInjectionHelper.configureFaultInjectionRules(testContainer, Arrays.asList(tooManyRequestsRule, leaseNotFoundFaultRule)).block(); + + CosmosDiagnostics cosmosDiagnostics + = this.performDocumentOperation(testContainer, operationType, createdItem, testItem -> new PartitionKey(testItem.getMypk()), isReadMany) + .block(); + + if (shouldRetryCrossRegion) { + assertThat(cosmosDiagnostics).isNotNull(); + assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull(); + + CosmosDiagnosticsContext diagnosticsContext = cosmosDiagnostics.getDiagnosticsContext(); + + assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(2); + assertThat(diagnosticsContext.getStatusCode()).isLessThan(HttpConstants.StatusCodes.BADREQUEST); + + if (operationType.isReadOnlyOperation()) { + validateCosmosDiagnosticsForMultiErrorCodesInQuorumRead(diagnosticsContext, Arrays.asList(410, 429)); + } + } else { + assertThat(cosmosDiagnostics).isNotNull(); + assertThat(cosmosDiagnostics.getDiagnosticsContext()).isNotNull(); + + CosmosDiagnosticsContext diagnosticsContext = cosmosDiagnostics.getDiagnosticsContext(); + + assertThat(diagnosticsContext.getContactedRegionNames().size()).isEqualTo(1); + assertThat(diagnosticsContext.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.SERVICE_UNAVAILABLE); + assertThat(diagnosticsContext.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.LEASE_NOT_FOUND); + + if (operationType.isReadOnlyOperation()) { + validateCosmosDiagnosticsForMultiErrorCodesInQuorumRead(diagnosticsContext, Arrays.asList(410, 429)); + } + } } finally { leaseNotFoundFaultRule.disable(); + tooManyRequestsRule.disable(); if (testClient != null) { cleanUpContainer(testContainer); @@ -697,7 +847,7 @@ private Mono performDocumentOperation( .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { CosmosException cosmosException = (CosmosException) throwable; - + return Mono.just(cosmosException.getDiagnostics()); } return Mono.error(throwable); @@ -759,7 +909,9 @@ private Mono performDocumentOperation( } if (operationType == OperationType.Create) { - return cosmosAsyncContainer.createItem(TestObject.create()).map(itemResponse -> itemResponse.getDiagnostics()) .onErrorResume(throwable -> { + return cosmosAsyncContainer.createItem(TestObject.create()) + .map(itemResponse -> itemResponse.getDiagnostics()) + .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { CosmosException cosmosException = (CosmosException) throwable; @@ -770,7 +922,9 @@ private Mono performDocumentOperation( } if (operationType == OperationType.Upsert) { - return cosmosAsyncContainer.upsertItem(TestObject.create()).map(itemResponse -> itemResponse.getDiagnostics()) .onErrorResume(throwable -> { + return cosmosAsyncContainer.upsertItem(TestObject.create()) + .map(itemResponse -> itemResponse.getDiagnostics()) + .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { CosmosException cosmosException = (CosmosException) throwable; @@ -852,6 +1006,24 @@ private Mono performDocumentOperation( } } + private void validateCosmosDiagnosticsForMultiErrorCodesInQuorumRead( + CosmosDiagnosticsContext cosmosDiagnosticsContext, + List expectedStatusCodes) { + + assertThat(cosmosDiagnosticsContext).isNotNull(); + + List actualStatusCodes = cosmosDiagnosticsContext.getDiagnostics() + .stream() + .map(diagnostics -> cosmosDiagnosticsAccessor.getClientSideRequestStatistics(diagnostics)) + .flatMap(clientSideRequestStatisticsCollection -> clientSideRequestStatisticsCollection.stream().map(ClientSideRequestStatistics::getResponseStatisticsList)) + .map(storeResponseStatisticsCollection -> storeResponseStatisticsCollection.stream().map(storeResponseStatistics -> storeResponseStatistics.getStoreResult().getStoreResponseDiagnostics()).collect(Collectors.toCollection(ArrayList::new))) + .map(storeResponseDiagnosticsCollection -> storeResponseDiagnosticsCollection.stream().map(StoreResponseDiagnostics::getStatusCode).collect(Collectors.toCollection(ArrayList::new))) + .flatMap(ArrayList::stream) + .collect(Collectors.toCollection(ArrayList::new)); + + assertThat(actualStatusCodes).containsAll(expectedStatusCodes); + } + private AccountLevelLocationContext getAccountLevelLocationContext(DatabaseAccount databaseAccount, boolean writeOnly) { Iterator locationIterator = writeOnly ? databaseAccount.getWritableLocations().iterator() : databaseAccount.getReadableLocations().iterator(); diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 2b915b75886e..e1eb22bc3836 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -12,6 +12,7 @@ * Fixed Null Pointer Exception for query when container recreated with same name. - [PR 45930](https://github.com/Azure/azure-sdk-for-java/pull/45930) * Fixed Null Pointer Exception for readMany when container recreated with same name. - [PR 45930](https://github.com/Azure/azure-sdk-for-java/pull/45930) * Fixed parameterized query failures for Hybrid Search queries. - [PR 46446](https://github.com/Azure/azure-sdk-for-java/pull/46446) +* Fixed Strong Consistency violation when a single replica in a partition returns a 410 `Lease Not Found`. - [PR 46433](https://github.com/Azure/azure-sdk-for-java/pull/46433) #### Other Changes * Added change to optimize lease checkpointing in `ChangeFeedProcessor` by conditionally executing checkpoint operations for 304 responses based on continuation token comparison, which helps to reduce RU consumption for unchanged feeds. See [PR 46521](https://github.com/Azure/azure-sdk-for-java/pull/46521) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java index a77d84d9d7f2..049603aa6f26 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Exceptions.java @@ -65,4 +65,9 @@ public static boolean isStaledResourceException(int statusCode, int subStatusCod || (statusCode == HttpConstants.StatusCodes.GONE && subStatusCode == HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE); } + + public static boolean isAvoidQuorumSelectionException(CosmosException cosmosException) { + return Exceptions.isStatusCode(cosmosException, HttpConstants.StatusCodes.GONE) + && Exceptions.isSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.LEASE_NOT_FOUND); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index 7f82b3c5e927..4c71d49fd5e6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -85,6 +85,7 @@ public class RxDocumentServiceRequest implements Cloneable { public volatile Map properties; public String throughputControlGroupName; public volatile boolean intendedCollectionRidPassedIntoSDK = false; + public volatile boolean isBarrierRequest = false; private volatile Duration responseTimeout; private volatile boolean nonIdempotentWriteRetriesEnabled = false; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierRequestHelper.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierRequestHelper.java index 7f6b2e497c5d..a39546e485cf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierRequestHelper.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/BarrierRequestHelper.java @@ -142,6 +142,8 @@ public static Mono createAsync( barrierLsnRequest.getHeaders().put(WFConstants.BackendHeaders.COLLECTION_RID, request.getHeaders().get(WFConstants.BackendHeaders.COLLECTION_RID)); } + barrierLsnRequest.isBarrierRequest = true; + if (hasAadToken) { return authorizationTokenProvider.populateAuthorizationHeader(barrierLsnRequest); } else { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java index 336b0241ae96..04ba424975e3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java @@ -231,7 +231,7 @@ public Mono shouldRetry(Exception exception) { exception); return Mono.just(ShouldRetryResult.noRetry(exceptionToThrow, - Quadruple.with(true, false, Duration.ofMillis(0), this.attemptCount.get()))); + Quadruple.with(true, true, Duration.ofMillis(0), this.attemptCount.get()))); } long remainingSeconds = this.waitTimeInSeconds - diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java index f43e3bcb02a7..1f01e66076d6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java @@ -12,6 +12,7 @@ import com.azure.cosmos.implementation.Exceptions; import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalServerErrorException; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.IAuthorizationTokenProvider; @@ -30,6 +31,7 @@ import java.time.Duration; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -153,6 +155,14 @@ public Mono readStrongAsync( secondaryQuorumReadResult -> { switch (secondaryQuorumReadResult.quorumResult) { + case QuorumNotPossibleInCurrentRegion: + try { + logger.warn("QuorumNotPossibleInCurrentRegion: ReadQuorumResult StoreResponses: {}", + String.join(";", secondaryQuorumReadResult.storeResponses)); + return Flux.error(secondaryQuorumReadResult.failFastException); + } catch (CosmosException e) { + return Flux.error(e); + } case QuorumMet: try { return Flux.just(secondaryQuorumReadResult.getResponse()); @@ -302,6 +312,7 @@ private Mono readQuorumAsync( return ensureQuorumSelectedStoreResponse(entity, readQuorum, includePrimary, readMode).flatMap( res -> { + if (res.getLeft() != null) { // no need for barrier return Mono.just(res.getKey()); @@ -326,7 +337,8 @@ private Mono readQuorumAsync( readLsn, globalCommittedLSN, storeResult, - storeResponses)); + storeResponses, + null)); } return Mono.just(new ReadQuorumResult( @@ -335,7 +347,8 @@ private Mono readQuorumAsync( readLsn, globalCommittedLSN, storeResult, - storeResponses)); + storeResponses, + null)); } ); } @@ -356,8 +369,11 @@ private Mono { StoreResponse storeResponse = response.getStoreResponse(); - if (storeResponse == null) { - return response.storePhysicalAddress + " -> n/a"; + if (storeResponse == null && response.getException() != null) { + + CosmosException cosmosException = response.getException(); + + return response.storePhysicalAddress + " -> " + "(" + cosmosException.getStatusCode() + "-" + cosmosException.getSubStatusCode() + ")"; } return response.storePhysicalAddress @@ -368,11 +384,36 @@ private Mono firstStoreResultWithIsAvoidQuorumSelectionException = responseResult + .stream() + .filter(response -> response.isAvoidQuorumSelectionException) + .findFirst(); + + if (firstStoreResultWithIsAvoidQuorumSelectionException.isPresent()) { + StoreResult storeResult = firstStoreResultWithIsAvoidQuorumSelectionException.get(); + logger.warn("Replica with address [{}] responded with error code [{}] " + + "because of which quorum selection against regional endpoint [{}] for operation is not possible!", + storeResult.storePhysicalAddress.toString(), + storeResult.getException().getStatusCode() + ":" + storeResult.getException().getSubStatusCode(), + entity.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString()); + + return Mono.just(Pair.of(new ReadQuorumResult( + entity.requestContext.requestChargeTracker, + ReadQuorumResultKind.QuorumNotPossibleInCurrentRegion, + -1, + -1, + null, + storeResponses, + storeResult.getException()), null)); + } + int responseCount = (int) responseResult.stream().filter(response -> response.isValid).count(); + if (responseCount < readQuorum) { return Mono.just(Pair.of(new ReadQuorumResult(entity.requestContext.requestChargeTracker, ReadQuorumResultKind.QuorumNotSelected, - -1, -1, null, storeResponses), null)); + -1, -1, null, storeResponses, null), null)); } //either request overrides consistency level with strong, or request does not @@ -404,7 +445,8 @@ private Mono storeResponses) { + List storeResponses, + CosmosException failFastException) { super(requestChargeTracker, selectedResponse); this.quorumResult = QuorumResult; @@ -826,6 +870,7 @@ public ReadQuorumResult( this.globalCommittedSelectedLsn = globalCommittedSelectedLsn; this.selectedResponse = selectedResponse; this.storeResponses = storeResponses; + this.failFastException = failFastException; } public final ReadQuorumResultKind quorumResult; @@ -844,6 +889,8 @@ public ReadQuorumResult( public final long globalCommittedSelectedLsn; + public final CosmosException failFastException; + protected boolean isValidResult() { return this.quorumResult == ReadQuorumResultKind.QuorumMet || this.quorumResult == ReadQuorumResultKind.QuorumSelected; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java index 000c683b056c..5169dfb121a7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java @@ -278,6 +278,41 @@ private Flux> readFromReplicas(List resultCollect return Mono.error(e); }).map(newStoreResults -> { for (StoreResult srr : newStoreResults) { + + if (srr.isAvoidQuorumSelectionException) { + + // todo: fail fast when barrier requests also hit isAvoidQuorumSelectionException? + // todo: https://github.com/Azure/azure-sdk-for-java/issues/46135 + if (!entity.isBarrierRequest) { + + // isAvoidQuorumSelectionException is a special case where we want to enable the enclosing data plane operation + // to fail fast in the region where a quorum selection is being attempted + // no attempts to reselect quorum will be made + if (logger.isDebugEnabled()) { + + int statusCode = srr.getException() != null ? srr.getException().getStatusCode() : 0; + int subStatusCode = srr.getException() != null ? srr.getException().getSubStatusCode() : 0; + + logger.debug("An exception with error code [{}-{}] was observed which means quorum cannot be attained in the current region!", statusCode, subStatusCode); + } + + if (!entity.requestContext.performedBackgroundAddressRefresh) { + this.startBackgroundAddressRefresh(entity); + entity.requestContext.performedBackgroundAddressRefresh = true; + } + + // (collect quorum store results if possible) + // for QuorumReader (upstream) to make the final decision on quorum selection + resultCollector.add(srr); + + // Remaining replicas + replicasToRead.set(replicaCountToRead - resultCollector.size()); + + // continue to the next store result + continue; + } + } + if (srr.isValid) { try { @@ -1000,7 +1035,7 @@ StoreResult createStoreResult(StoreResponse storeResponse, /* currentReplicaSetSize: */ currentReplicaSetSize, /* currentWriteQuorum: */ currentWriteQuorum, /* isValid: */!requiresValidLsn - || ((cosmosException.getStatusCode() != HttpConstants.StatusCodes.GONE || isSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE) || isSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.LEASE_NOT_FOUND)) + || ((cosmosException.getStatusCode() != HttpConstants.StatusCodes.GONE || isSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.NAME_CACHE_IS_STALE)) && lsn >= 0), // TODO: verify where exception.RequestURI is supposed to be set in .Net /* storePhysicalAddress: */ storePhysicalAddress == null diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java index 2a39a3d07497..88531ae7828e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResult.java @@ -36,6 +36,7 @@ public class StoreResult { final public boolean isGoneException; final public boolean isNotFoundException; final public boolean isInvalidPartitionException; + final public boolean isAvoidQuorumSelectionException; final public Uri storePhysicalAddress; final public boolean isThroughputControlRequestRateTooLargeException; final public Double backendLatencyInMs; @@ -83,6 +84,7 @@ public StoreResult( this.isThroughputControlRequestRateTooLargeException = this.exception != null && Exceptions.isThroughputControlRequestRateTooLargeException(this.exception); this.backendLatencyInMs = backendLatencyInMs; this.retryAfterInMs = retryAfterInMs; + this.isAvoidQuorumSelectionException = this.exception != null && Exceptions.isAvoidQuorumSelectionException(this.exception); } public StoreResponse getStoreResponse() {