From 3dd5e41eaadbd0297cb7b85cc58e91b47fc9fc66 Mon Sep 17 00:00:00 2001 From: Shay Rojansky Date: Wed, 26 Jun 2024 15:40:48 +0200 Subject: [PATCH 1/6] Implement Cosmos pagination Closes #24513 --- .../Diagnostics/CosmosQueryEventData.cs | 4 +- .../CosmosQueryExecutedEventData.cs | 4 +- .../Internal/CosmosLoggerExtensions.cs | 8 +- .../Extensions/CosmosQueryableExtensions.cs | 98 +++++++ ...yableMethodTranslatingExpressionVisitor.cs | 58 ++++ ...ressionVisitor.PagingQueryingEnumerable.cs | 265 ++++++++++++++++++ ...ingExpressionVisitor.QueryingEnumerable.cs | 10 +- ...osShapedQueryCompilingExpressionVisitor.cs | 126 ++++++--- .../Internal/Expressions/PagingExpression.cs | 127 +++++++++ .../Storage/Internal/CosmosClientWrapper.cs | 246 +++++++++++----- .../Storage/Internal/ICosmosClientWrapper.cs | 14 +- .../NorthwindMiscellaneousQueryCosmosTest.cs | 59 ++++ 12 files changed, 881 insertions(+), 138 deletions(-) create mode 100644 src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs create mode 100644 src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs diff --git a/src/EFCore.Cosmos/Diagnostics/CosmosQueryEventData.cs b/src/EFCore.Cosmos/Diagnostics/CosmosQueryEventData.cs index fca1bea3489..65e0e729503 100644 --- a/src/EFCore.Cosmos/Diagnostics/CosmosQueryEventData.cs +++ b/src/EFCore.Cosmos/Diagnostics/CosmosQueryEventData.cs @@ -25,7 +25,7 @@ public CosmosQueryEventData( EventDefinitionBase eventDefinition, Func messageGenerator, string containerId, - PartitionKey partitionKeyValue, + PartitionKey? partitionKeyValue, IReadOnlyList<(string Name, object? Value)> parameters, string querySql, bool logSensitiveData) @@ -46,7 +46,7 @@ public CosmosQueryEventData( /// /// The key of the Cosmos partition that the query is using. /// - public virtual PartitionKey PartitionKeyValue { get; } + public virtual PartitionKey? PartitionKeyValue { get; } /// /// Name/values for each parameter in the Cosmos Query. diff --git a/src/EFCore.Cosmos/Diagnostics/CosmosQueryExecutedEventData.cs b/src/EFCore.Cosmos/Diagnostics/CosmosQueryExecutedEventData.cs index 2934ac9e71f..351a250b310 100644 --- a/src/EFCore.Cosmos/Diagnostics/CosmosQueryExecutedEventData.cs +++ b/src/EFCore.Cosmos/Diagnostics/CosmosQueryExecutedEventData.cs @@ -31,7 +31,7 @@ public CosmosQueryExecutedEventData( double requestCharge, string activityId, string containerId, - PartitionKey partitionKeyValue, + PartitionKey? partitionKeyValue, IReadOnlyList<(string Name, object? Value)> parameters, string querySql, bool logSensitiveData) @@ -70,7 +70,7 @@ public CosmosQueryExecutedEventData( /// /// The key of the Cosmos partition that the query is using. /// - public virtual PartitionKey PartitionKeyValue { get; } + public virtual PartitionKey? PartitionKeyValue { get; } /// /// Name/values for each parameter in the Cosmos Query. diff --git a/src/EFCore.Cosmos/Diagnostics/Internal/CosmosLoggerExtensions.cs b/src/EFCore.Cosmos/Diagnostics/Internal/CosmosLoggerExtensions.cs index 32e4f91c1f5..1689ca7b98a 100644 --- a/src/EFCore.Cosmos/Diagnostics/Internal/CosmosLoggerExtensions.cs +++ b/src/EFCore.Cosmos/Diagnostics/Internal/CosmosLoggerExtensions.cs @@ -54,7 +54,7 @@ public static void SyncNotSupported( public static void ExecutingSqlQuery( this IDiagnosticsLogger diagnostics, string containerId, - PartitionKey partitionKeyValue, + PartitionKey? partitionKeyValue, CosmosSqlQuery cosmosSqlQuery) { var definition = CosmosResources.LogExecutingSqlQuery(diagnostics); @@ -66,7 +66,7 @@ public static void ExecutingSqlQuery( definition.Log( diagnostics, containerId, - logSensitiveData ? partitionKeyValue.ToString() : "?", + logSensitiveData ? partitionKeyValue?.ToString() : "?", FormatParameters(cosmosSqlQuery.Parameters, logSensitiveData && cosmosSqlQuery.Parameters.Count > 0), Environment.NewLine, cosmosSqlQuery.Query); @@ -158,7 +158,7 @@ public static void ExecutedReadNext( double requestCharge, string activityId, string containerId, - PartitionKey partitionKeyValue, + PartitionKey? partitionKeyValue, CosmosSqlQuery cosmosSqlQuery) { var definition = CosmosResources.LogExecutedReadNext(diagnostics); @@ -177,7 +177,7 @@ public static void ExecutedReadNext( requestCharge, activityId, containerId, - logSensitiveData ? partitionKeyValue.ToString() : "?", + logSensitiveData ? partitionKeyValue?.ToString() : "?", FormatParameters(cosmosSqlQuery.Parameters, logSensitiveData && cosmosSqlQuery.Parameters.Count > 0), Environment.NewLine, cosmosSqlQuery.Query)); diff --git a/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs b/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs index b974d714a92..c7b8e89eb90 100644 --- a/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs +++ b/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using Azure; using JetBrains.Annotations; using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal; using Microsoft.EntityFrameworkCore.Query.Internal; @@ -17,6 +18,8 @@ namespace Microsoft.EntityFrameworkCore; /// public static class CosmosQueryableExtensions { + #region WithPartitionKey + internal static readonly MethodInfo WithPartitionKeyMethodInfo = typeof(CosmosQueryableExtensions).GetTypeInfo() .GetDeclaredMethods(nameof(WithPartitionKey)) @@ -74,6 +77,10 @@ source.Provider is EntityQueryProvider : source; } + #endregion WithPartitionKey + + #region FromSql + /// /// Creates a LINQ query based on an interpolated string representing a SQL query. /// @@ -177,4 +184,95 @@ private static FromSqlQueryRootExpression GenerateFromSqlQueryRoot( sql, Expression.Constant(arguments)); } + + #endregion FromSql + + #region ToPage + + internal static readonly MethodInfo ToPageAsyncMethodInfo + = typeof(CosmosQueryableExtensions).GetTypeInfo() + .GetDeclaredMethods(nameof(ToPageAsync)) + .Single(); + + internal static readonly MethodInfo ToPageMethodInfo + = typeof(CosmosQueryableExtensions).GetTypeInfo() + .GetDeclaredMethods(nameof(ToPage)) + .Single(); + + /// + /// Allows paginating through query results by repeatedly executing the same query, passing continuation tokens to retrieve + /// successive pages of the result set, and specifying the maximum number of results per page. + /// + /// The source query. + /// + /// An optional continuation token returned from a previous execution of this query via . + /// If , retrieves query results from the start. + /// + /// + /// The maximum number of results in the returned . The page may contain fewer results of the database + /// did not contain enough matching results. + /// + /// Limits the length of continuation token in the query response. + /// A containing at most results. + public static Page ToPage( + this IQueryable source, + string? continuationToken = null, + int? maxItemCount = null, + int? responseContinuationTokenLimitInKb = null) + => source.Provider.Execute>( + Expression.Call( + instance: null, + method: ToPageMethodInfo.MakeGenericMethod(typeof(TSource)), + arguments: + [ + source.Expression, + Expression.Constant(continuationToken, typeof(string)), + Expression.Constant(maxItemCount, typeof(int?)), + Expression.Constant(responseContinuationTokenLimitInKb, typeof(int?)) + ])); + + /// + /// Allows paginating through query results by repeatedly executing the same query, passing continuation tokens to retrieve + /// successive pages of the result set, and specifying the maximum number of results per page. + /// + /// The source query. + /// + /// An optional continuation token returned from a previous execution of this query via . + /// If , retrieves query results from the start. + /// + /// + /// The maximum number of results in the returned . The page may contain fewer results of the database + /// did not contain enough matching results. + /// + /// Limits the length of continuation token in the query response. + /// A to observe while waiting for the task to complete. + /// A containing at most results. + public static Task> ToPageAsync( + this IQueryable source, + string? continuationToken = null, + int? maxItemCount = null, + int? responseContinuationTokenLimitInKb = null, + CancellationToken cancellationToken = default) + { + if (source.Provider is not IAsyncQueryProvider provider) + { + throw new InvalidOperationException(CoreStrings.IQueryableProviderNotAsync); + } + + return provider.ExecuteAsync>>( + Expression.Call( + instance: null, + method: ToPageAsyncMethodInfo.MakeGenericMethod(typeof(TSource)), + arguments: + [ + source.Expression, + Expression.Constant(continuationToken, typeof(string)), + Expression.Constant(maxItemCount, typeof(int?)), + Expression.Constant(responseContinuationTokenLimitInKb, typeof(int?)), + Expression.Constant(default(CancellationToken), typeof(CancellationToken)) + ]), + cancellationToken); + } + + #endregion ToPage } diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs b/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs index 3ba1654c57e..4baf6d174bc 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs @@ -4,6 +4,7 @@ using System.Diagnostics.CodeAnalysis; using Microsoft.EntityFrameworkCore.Cosmos.Internal; using Microsoft.EntityFrameworkCore.Cosmos.Metadata.Internal; +using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal.Expressions; using Microsoft.EntityFrameworkCore.Cosmos.Storage.Internal; using Microsoft.EntityFrameworkCore.Internal; @@ -86,6 +87,63 @@ protected CosmosQueryableMethodTranslatingExpressionVisitor( _subquery = true; } + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public override Expression Translate(Expression expression) + { + // Handle ToPageAsync(), which can only ever be the top-level node in the query tree. + if (expression is MethodCallExpression { Method: var method, Arguments: var arguments } + && method.DeclaringType == typeof(CosmosQueryableExtensions) + && method.Name is nameof(CosmosQueryableExtensions.ToPage) or nameof(CosmosQueryableExtensions.ToPageAsync)) + { + var source = base.Translate(arguments[0]); + + if (source == QueryCompilationContext.NotTranslatedExpression) + { + return source; + } + + if (source is not ShapedQueryExpression shapedQuery) + { + throw new UnreachableException($"Expected a ShapedQueryExpression but found {source.GetType().Name}"); + } + + // The arguments to ToPage/ToPageAsync must have been parameterized by the funcletizer, since they're non-lambda arguments to + // a top-level function (like Skip/Take). Translate to get these as SqlParameterExpressions. + if (arguments is not + [ + _, // source + ParameterExpression continuationToken, + ParameterExpression maxItemCount, + ParameterExpression responseContinuationTokenLimitInKb, + .. + ] + || _sqlTranslator.Translate(continuationToken) is not SqlParameterExpression translatedContinuationToken + || _sqlTranslator.Translate(maxItemCount) is not SqlParameterExpression translatedMaxItemCount + || _sqlTranslator.Translate(responseContinuationTokenLimitInKb) is not SqlParameterExpression + translatedResponseContinuationTokenLimitInKb) + { + throw new UnreachableException("ToPageAsync without the appropriate parameterized arguments"); + } + + // Wrap the shaper for the entire query in a PagingExpression which also contains the paging arguments, and update + // the final cardinality to Single (since we'll be returning a single Page). + return shapedQuery + .UpdateShaperExpression(new PagingExpression( + shapedQuery.ShaperExpression, + translatedContinuationToken, + translatedMaxItemCount, + translatedResponseContinuationTokenLimitInKb)) + .UpdateResultCardinality(ResultCardinality.Single); + } + + return base.Translate(expression); + } + /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to /// the same compatibility standards as public APIs. It may be changed or removed without notice in diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs new file mode 100644 index 00000000000..d905b3e9e14 --- /dev/null +++ b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs @@ -0,0 +1,265 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +#nullable disable + +using System.Collections; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using System.Text; +using Azure; +using Microsoft.EntityFrameworkCore.Cosmos.Diagnostics.Internal; +using Microsoft.EntityFrameworkCore.Cosmos.Internal; +using Microsoft.EntityFrameworkCore.Cosmos.Storage.Internal; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microsoft.EntityFrameworkCore.Cosmos.Query.Internal; + +/// +/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to +/// the same compatibility standards as public APIs. It may be changed or removed without notice in +/// any release. You should only use it directly in your code with extreme caution and knowing that +/// doing so can result in application failures when updating to a new Entity Framework Core release. +/// +public partial class CosmosShapedQueryCompilingExpressionVisitor +{ + private sealed class PagingQueryingEnumerable : IEnumerable>, IAsyncEnumerable> + { + private readonly CosmosQueryContext _cosmosQueryContext; + private readonly ISqlExpressionFactory _sqlExpressionFactory; + private readonly SelectExpression _selectExpression; + private readonly Func _shaper; + private readonly IQuerySqlGeneratorFactory _querySqlGeneratorFactory; + private readonly Type _contextType; + private readonly string _cosmosContainer; + private readonly PartitionKey _cosmosPartitionKeyValue; + private readonly IDiagnosticsLogger _queryLogger; + private readonly IDiagnosticsLogger _commandLogger; + private readonly bool _standAloneStateManager; + private readonly bool _threadSafetyChecksEnabled; + private readonly string _continuationTokenParameterName; + private readonly string _maxItemCountParameterName; + private readonly string _responseContinuationTokenLimitInKbParameterName; + + public PagingQueryingEnumerable( + CosmosQueryContext cosmosQueryContext, + ISqlExpressionFactory sqlExpressionFactory, + IQuerySqlGeneratorFactory querySqlGeneratorFactory, + SelectExpression selectExpression, + Func shaper, + Type contextType, + string cosmosContainer, + PartitionKey partitionKeyValueFromExtension, + bool standAloneStateManager, + bool threadSafetyChecksEnabled, + string continuationTokenParameterName, + string maxItemCountParameterName, + string responseContinuationTokenLimitInKbParameterName) + { + _cosmosQueryContext = cosmosQueryContext; + _sqlExpressionFactory = sqlExpressionFactory; + _querySqlGeneratorFactory = querySqlGeneratorFactory; + _selectExpression = selectExpression; + _shaper = shaper; + _contextType = contextType; + _queryLogger = cosmosQueryContext.QueryLogger; + _commandLogger = cosmosQueryContext.CommandLogger; + _standAloneStateManager = standAloneStateManager; + _threadSafetyChecksEnabled = threadSafetyChecksEnabled; + _continuationTokenParameterName = continuationTokenParameterName; + _maxItemCountParameterName = maxItemCountParameterName; + _responseContinuationTokenLimitInKbParameterName = responseContinuationTokenLimitInKbParameterName; + + var partitionKey = selectExpression.GetPartitionKeyValue(cosmosQueryContext.ParameterValues); + if (partitionKey != PartitionKey.None + && partitionKeyValueFromExtension != PartitionKey.None + && !partitionKeyValueFromExtension.Equals(partitionKey)) + { + throw new InvalidOperationException(CosmosStrings.PartitionKeyMismatch(partitionKeyValueFromExtension, partitionKey)); + } + + _cosmosPartitionKeyValue = partitionKey != PartitionKey.None ? partitionKey : partitionKeyValueFromExtension; + _cosmosContainer = cosmosContainer; + } + + public IEnumerator> GetEnumerator() + => new Enumerator(this); + + IEnumerator IEnumerable.GetEnumerator() + => GetEnumerator(); + + public IAsyncEnumerator> GetAsyncEnumerator(CancellationToken cancellationToken = default) + => new Enumerator(this, cancellationToken); + + private CosmosSqlQuery GenerateQuery() + => _querySqlGeneratorFactory.Create().GetSqlQuery( + (SelectExpression)new InExpressionValuesExpandingExpressionVisitor( + _sqlExpressionFactory, + _cosmosQueryContext.ParameterValues) + .Visit(_selectExpression), + _cosmosQueryContext.ParameterValues); + + private sealed class Enumerator : IEnumerator>, IAsyncEnumerator> + { + private readonly PagingQueryingEnumerable _queryingEnumerable; + private readonly CosmosQueryContext _cosmosQueryContext; + private readonly Func _shaper; + private readonly Type _contextType; + private readonly string _cosmosContainer; + private readonly PartitionKey _cosmosPartitionKeyValue; + private readonly IDiagnosticsLogger _queryLogger; + private readonly IDiagnosticsLogger _commandLogger; + private readonly bool _standAloneStateManager; + private readonly CancellationToken _cancellationToken; + private readonly IConcurrencyDetector _concurrencyDetector; + private readonly IExceptionDetector _exceptionDetector; + + private bool _hasExecuted; + private bool _isDisposed; + + public Enumerator(PagingQueryingEnumerable queryingEnumerable, CancellationToken cancellationToken = default) + { + _queryingEnumerable = queryingEnumerable; + _cosmosQueryContext = queryingEnumerable._cosmosQueryContext; + _shaper = queryingEnumerable._shaper; + _contextType = queryingEnumerable._contextType; + _cosmosContainer = queryingEnumerable._cosmosContainer; + _cosmosPartitionKeyValue = queryingEnumerable._cosmosPartitionKeyValue; + _queryLogger = queryingEnumerable._queryLogger; + _commandLogger = queryingEnumerable._commandLogger; + _standAloneStateManager = queryingEnumerable._standAloneStateManager; + _exceptionDetector = _cosmosQueryContext.ExceptionDetector; + _cancellationToken = cancellationToken; + + _concurrencyDetector = queryingEnumerable._threadSafetyChecksEnabled + ? _cosmosQueryContext.ConcurrencyDetector + : null; + } + + public Page Current { get; private set; } + + object IEnumerator.Current => Current; + + public bool MoveNext() + => MoveNextCore().GetAwaiter().GetResult(); + + public ValueTask MoveNextAsync() + => new(MoveNextCore()); + + private async Task MoveNextCore() + { + ObjectDisposedException.ThrowIf(_isDisposed, typeof(Enumerator)); + + try + { + _concurrencyDetector?.EnterCriticalSection(); + + try + { + if (_hasExecuted) + { + return false; + } + + _hasExecuted = true; + + var continuationToken = + (string)_cosmosQueryContext.ParameterValues[_queryingEnumerable._continuationTokenParameterName]; + var maxItemCount = (int?)_cosmosQueryContext.ParameterValues[_queryingEnumerable._maxItemCountParameterName]; + var responseContinuationTokenLimitInKb = (int?) + _cosmosQueryContext.ParameterValues[_queryingEnumerable._responseContinuationTokenLimitInKbParameterName]; + + var sqlQuery = _queryingEnumerable.GenerateQuery(); + + EntityFrameworkMetricsData.ReportQueryExecuting(); + + var queryRequestOptions = new QueryRequestOptions + { + ResponseContinuationTokenLimitInKb = responseContinuationTokenLimitInKb + }; + + if (_cosmosPartitionKeyValue != PartitionKey.None) + { + queryRequestOptions.PartitionKey = _cosmosPartitionKeyValue; + } + + var cosmosClient = _cosmosQueryContext.CosmosClient; + _commandLogger.ExecutingSqlQuery(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery); + _cosmosQueryContext.InitializeStateManager(_standAloneStateManager); + + var results = maxItemCount.HasValue ? new List(maxItemCount.Value) : []; + + while (maxItemCount is null or > 0) + { + queryRequestOptions.MaxItemCount = maxItemCount; + using var feedIterator = cosmosClient.CreateQuery( + _cosmosContainer, sqlQuery, continuationToken, queryRequestOptions); + + using var responseMessage = await feedIterator.ReadNextAsync(_cancellationToken).ConfigureAwait(false); + + _commandLogger.ExecutedReadNext( + responseMessage.Diagnostics.GetClientElapsedTime(), + responseMessage.Headers.RequestCharge, + responseMessage.Headers.ActivityId, + _cosmosContainer, + _cosmosPartitionKeyValue, + sqlQuery); + + responseMessage.EnsureSuccessStatusCode(); + + var responseMessageEnumerable = cosmosClient.GetResponseMessageEnumerable(responseMessage); + foreach (var resultObject in responseMessageEnumerable) + { + results.Add(_shaper(_cosmosQueryContext, resultObject)); + maxItemCount--; + } + + continuationToken = responseMessage.ContinuationToken; + + if (responseMessage.ContinuationToken is null) + { + break; + } + } + + Current = Page.FromValues(results, continuationToken, null!); // TODO: Response... + + _hasExecuted = true; + return true; + } + finally + { + _concurrencyDetector?.ExitCriticalSection(); + } + } + catch (Exception exception) + { + if (_exceptionDetector.IsCancellation(exception, _cancellationToken)) + { + _queryLogger.QueryCanceled(_contextType); + } + else + { + _queryLogger.QueryIterationFailed(_contextType, exception); + } + + throw; + } + } + + public ValueTask DisposeAsync() + { + Dispose(); + + return default; + } + + public void Dispose() + => _isDisposed = true; + + public void Reset() + => throw new NotSupportedException(); + } + } +} diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.QueryingEnumerable.cs b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.QueryingEnumerable.cs index edf79fdec44..cf0876f47fc 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.QueryingEnumerable.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.QueryingEnumerable.cs @@ -158,10 +158,7 @@ public bool MoveNext() EntityFrameworkMetricsData.ReportQueryExecuting(); _enumerator = _cosmosQueryContext.CosmosClient - .ExecuteSqlQuery( - _cosmosContainer, - _cosmosPartitionKeyValue, - sqlQuery) + .ExecuteSqlQuery(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery) .GetEnumerator(); _cosmosQueryContext.InitializeStateManager(_standAloneStateManager); } @@ -256,10 +253,7 @@ public async ValueTask MoveNextAsync() EntityFrameworkMetricsData.ReportQueryExecuting(); _enumerator = _cosmosQueryContext.CosmosClient - .ExecuteSqlQueryAsync( - _cosmosContainer, - _cosmosPartitionKeyValue, - sqlQuery) + .ExecuteSqlQueryAsync(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery) .GetAsyncEnumerator(_cancellationToken); _cosmosQueryContext.InitializeStateManager(_standAloneStateManager); } diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs index 23a17d2d437..dea15f6877d 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs @@ -3,6 +3,7 @@ #nullable disable +using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal.Expressions; using Newtonsoft.Json.Linq; using static System.Linq.Expressions.Expression; @@ -43,55 +44,90 @@ protected override Expression VisitShapedQuery(ShapedQueryExpression shapedQuery var jObjectParameter = Parameter(typeof(JObject), "jObject"); var shaperBody = shapedQueryExpression.ShaperExpression; + + var (paging, continuationToken, maxItemCount, responseContinuationTokenLimitInKb) = + (false, (SqlParameterExpression)null, (SqlParameterExpression)null, (SqlParameterExpression)null); + + // If the query is terminated ToPageAsync(), CosmosQueryableMethodTranslatingExpressionVisitor composed a PagingExpression on top + // of the shaper. We remove that to get the shaper for each actual document being read (as opposed to the page of those documents), + // and extract the pagination arguments. + if (shaperBody is PagingExpression pagingExpression) + { + paging = true; + continuationToken = pagingExpression.ContinuationToken; + maxItemCount = pagingExpression.MaxItemCount; + responseContinuationTokenLimitInKb = pagingExpression.ResponseContinuationTokenLimitInKb; + + shaperBody = pagingExpression.Expression; + } + shaperBody = new JObjectInjectingExpressionVisitor().Visit(shaperBody); shaperBody = InjectEntityMaterializers(shaperBody); - switch (shapedQueryExpression.QueryExpression) + if (shapedQueryExpression.QueryExpression is not SelectExpression selectExpression) { - case SelectExpression selectExpression: - shaperBody = new CosmosProjectionBindingRemovingExpressionVisitor( - selectExpression, jObjectParameter, - QueryCompilationContext.QueryTrackingBehavior == QueryTrackingBehavior.TrackAll) - .Visit(shaperBody); - - var shaperLambda = Lambda( - shaperBody, - QueryCompilationContext.QueryContextParameter, - jObjectParameter); - - var cosmosQueryContextConstant = Convert(QueryCompilationContext.QueryContextParameter, typeof(CosmosQueryContext)); - var shaperConstant = Constant(shaperLambda.Compile()); - var contextTypeConstant = Constant(_contextType); - var containerConstant = Constant(cosmosQueryCompilationContext.CosmosContainer); - var threadSafetyConstant = Constant(_threadSafetyChecksEnabled); - var standAloneStateManagerConstant = Constant( - QueryCompilationContext.QueryTrackingBehavior == QueryTrackingBehavior.NoTrackingWithIdentityResolution); - - return selectExpression.ReadItemInfo != null - ? New( - typeof(ReadItemQueryingEnumerable<>).MakeGenericType(selectExpression.ReadItemInfo.Type).GetConstructors()[0], - cosmosQueryContextConstant, - containerConstant, - Constant(selectExpression.ReadItemInfo), - shaperConstant, - contextTypeConstant, - standAloneStateManagerConstant, - threadSafetyConstant) - : New( - typeof(QueryingEnumerable<>).MakeGenericType(shaperLambda.ReturnType).GetConstructors()[0], - cosmosQueryContextConstant, - Constant(sqlExpressionFactory), - Constant(querySqlGeneratorFactory), - Constant(selectExpression), - shaperConstant, - contextTypeConstant, - containerConstant, - Constant(_partitionKeyValueFromExtension, typeof(PartitionKey)), - standAloneStateManagerConstant, - threadSafetyConstant); - - default: - throw new NotSupportedException(CoreStrings.UnhandledExpressionNode(shapedQueryExpression.QueryExpression)); + throw new NotSupportedException(CoreStrings.UnhandledExpressionNode(shapedQueryExpression.QueryExpression)); } + + shaperBody = new CosmosProjectionBindingRemovingExpressionVisitor( + selectExpression, jObjectParameter, + QueryCompilationContext.QueryTrackingBehavior == QueryTrackingBehavior.TrackAll) + .Visit(shaperBody); + + var shaperLambda = Lambda( + shaperBody, + QueryCompilationContext.QueryContextParameter, + jObjectParameter); + + var cosmosQueryContextConstant = Convert(QueryCompilationContext.QueryContextParameter, typeof(CosmosQueryContext)); + var shaperConstant = Constant(shaperLambda.Compile()); + var contextTypeConstant = Constant(_contextType); + var containerConstant = Constant(cosmosQueryCompilationContext.CosmosContainer); + var threadSafetyConstant = Constant(_threadSafetyChecksEnabled); + var standAloneStateManagerConstant = Constant( + QueryCompilationContext.QueryTrackingBehavior == QueryTrackingBehavior.NoTrackingWithIdentityResolution); + + Check.DebugAssert(!paging || selectExpression.ReadItemInfo is null, "ReadItem is being with paging, impossible."); + + return selectExpression switch + { + { ReadItemInfo: ReadItemInfo readItemInfo } => New( + typeof(ReadItemQueryingEnumerable<>).MakeGenericType(readItemInfo.Type).GetConstructors()[0], + cosmosQueryContextConstant, + containerConstant, + Constant(readItemInfo), + shaperConstant, + contextTypeConstant, + standAloneStateManagerConstant, + threadSafetyConstant), + + _ when paging => New( + typeof(PagingQueryingEnumerable<>).MakeGenericType(shaperLambda.ReturnType).GetConstructors()[0], + cosmosQueryContextConstant, + Constant(sqlExpressionFactory), + Constant(querySqlGeneratorFactory), + Constant(selectExpression), + shaperConstant, + contextTypeConstant, + containerConstant, + Constant(_partitionKeyValueFromExtension, typeof(PartitionKey)), + standAloneStateManagerConstant, + threadSafetyConstant, + Constant(continuationToken.Name), + Constant(maxItemCount.Name), + Constant(responseContinuationTokenLimitInKb.Name)), + + _ => New( + typeof(QueryingEnumerable<>).MakeGenericType(shaperLambda.ReturnType).GetConstructors()[0], cosmosQueryContextConstant, + Constant(sqlExpressionFactory), + Constant(querySqlGeneratorFactory), + Constant(selectExpression), + shaperConstant, + contextTypeConstant, + containerConstant, + Constant(_partitionKeyValueFromExtension, typeof(PartitionKey)), + standAloneStateManagerConstant, + threadSafetyConstant) + }; } } diff --git a/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs b/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs new file mode 100644 index 00000000000..9f63c76a95a --- /dev/null +++ b/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs @@ -0,0 +1,127 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace Microsoft.EntityFrameworkCore.Cosmos.Query.Internal.Expressions; + +/// +/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to +/// the same compatibility standards as public APIs. It may be changed or removed without notice in +/// any release. You should only use it directly in your code with extreme caution and knowing that +/// doing so can result in application failures when updating to a new Entity Framework Core release. +/// +public class PagingExpression( + Expression expression, + SqlParameterExpression continuationToken, + SqlParameterExpression maxItemCount, + SqlParameterExpression responseContinuationTokenLimitInKb) + : Expression, IPrintableExpression +{ + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public sealed override ExpressionType NodeType + => ExpressionType.Extension; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public override Type Type + => Expression.Type; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual Expression Expression { get; } = expression; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual SqlParameterExpression ContinuationToken { get; } = continuationToken; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual SqlParameterExpression MaxItemCount { get; } = maxItemCount; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual SqlParameterExpression ResponseContinuationTokenLimitInKb { get; } = responseContinuationTokenLimitInKb; + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + protected override Expression VisitChildren(ExpressionVisitor visitor) + => Update(visitor.Visit(Expression)); + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual PagingExpression Update(Expression expression) + => expression == Expression + ? this + : new PagingExpression(expression, ContinuationToken, MaxItemCount, ResponseContinuationTokenLimitInKb); + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + void IPrintableExpression.Print(ExpressionPrinter expressionPrinter) + { + expressionPrinter.Append("ToPage: "); + expressionPrinter.Visit(Expression); + } + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public override bool Equals(object? obj) + => obj != null + && (ReferenceEquals(this, obj) + || obj is PagingExpression other + && Equals(other)); + + private bool Equals(PagingExpression other) + => Expression.Equals(other.Expression) + && ContinuationToken.Equals(other.ContinuationToken) + && MaxItemCount.Equals(other.MaxItemCount) + && ResponseContinuationTokenLimitInKb.Equals(other.ResponseContinuationTokenLimitInKb); + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public override int GetHashCode() + => Expression.GetHashCode(); +} diff --git a/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs b/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs index 62aa38750f8..0d371c0bf15 100644 --- a/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs +++ b/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs @@ -658,8 +658,9 @@ private static Task CreateSingleItemQueryAsync( /// public virtual FeedIterator CreateQuery( string containerId, - PartitionKey partitionKeyValue, - CosmosSqlQuery query) + CosmosSqlQuery query, + string? continuationToken = null, + QueryRequestOptions? queryRequestOptions = null) { var container = Client.GetDatabase(_databaseId).GetContainer(containerId); var queryDefinition = new QueryDefinition(query.Query); @@ -669,14 +670,7 @@ public virtual FeedIterator CreateQuery( queryDefinition, (current, parameter) => current.WithParameter(parameter.Name, parameter.Value)); - if (partitionKeyValue == PartitionKey.None) - { - return container.GetItemQueryStreamIterator(queryDefinition); - } - - var queryRequestOptions = new QueryRequestOptions { PartitionKey = partitionKeyValue }; - - return container.GetItemQueryStreamIterator(queryDefinition, requestOptions: queryRequestOptions); + return container.GetItemQueryStreamIterator(queryDefinition, continuationToken, queryRequestOptions); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -718,24 +712,17 @@ private static bool TryReadJObject(JsonTextReader jsonReader, [NotNullWhen(true) return false; } - private sealed class DocumentEnumerable : IEnumerable + private sealed class DocumentEnumerable( + CosmosClientWrapper cosmosClient, + string containerId, + PartitionKey partitionKeyValue, + CosmosSqlQuery cosmosSqlQuery) + : IEnumerable { - private readonly CosmosClientWrapper _cosmosClient; - private readonly string _containerId; - private readonly PartitionKey _partitionKeyValue; - private readonly CosmosSqlQuery _cosmosSqlQuery; - - public DocumentEnumerable( - CosmosClientWrapper cosmosClient, - string containerId, - PartitionKey partitionKeyValue, - CosmosSqlQuery cosmosSqlQuery) - { - _cosmosClient = cosmosClient; - _containerId = containerId; - _partitionKeyValue = partitionKeyValue; - _cosmosSqlQuery = cosmosSqlQuery; - } + private readonly CosmosClientWrapper _cosmosClient = cosmosClient; + private readonly string _containerId = containerId; + private readonly PartitionKey _partitionKeyValue = partitionKeyValue; + private readonly CosmosSqlQuery _cosmosSqlQuery = cosmosSqlQuery; public IEnumerator GetEnumerator() => new Enumerator(this); @@ -743,12 +730,12 @@ public IEnumerator GetEnumerator() IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - private sealed class Enumerator : IEnumerator + private sealed class Enumerator(DocumentEnumerable documentEnumerable) : IEnumerator { - private readonly CosmosClientWrapper _cosmosClientWrapper; - private readonly string _containerId; - private readonly PartitionKey _partitionKeyValue; - private readonly CosmosSqlQuery _cosmosSqlQuery; + private readonly CosmosClientWrapper _cosmosClientWrapper = documentEnumerable._cosmosClient; + private readonly string _containerId = documentEnumerable._containerId; + private readonly PartitionKey _partitionKeyValue = documentEnumerable._partitionKeyValue; + private readonly CosmosSqlQuery _cosmosSqlQuery = documentEnumerable._cosmosSqlQuery; private JObject? _current; private ResponseMessage? _responseMessage; @@ -758,14 +745,6 @@ private sealed class Enumerator : IEnumerator private FeedIterator? _query; - public Enumerator(DocumentEnumerable documentEnumerable) - { - _cosmosClientWrapper = documentEnumerable._cosmosClient; - _containerId = documentEnumerable._containerId; - _partitionKeyValue = documentEnumerable._partitionKeyValue; - _cosmosSqlQuery = documentEnumerable._cosmosSqlQuery; - } - public JObject Current => _current ?? throw new InvalidOperationException(); @@ -777,7 +756,17 @@ public bool MoveNext() { if (_jsonReader == null) { - _query ??= _cosmosClientWrapper.CreateQuery(_containerId, _partitionKeyValue, _cosmosSqlQuery); + if (_query is null) + { + var queryRequestOptions = new QueryRequestOptions(); + if (_partitionKeyValue != PartitionKey.None) + { + queryRequestOptions.PartitionKey = _partitionKeyValue; + } + + _query = _cosmosClientWrapper.CreateQuery( + _containerId, _cosmosSqlQuery, continuationToken: null, queryRequestOptions); + } if (!_query.HasMoreResults) { @@ -836,35 +825,28 @@ public void Reset() } } - private sealed class DocumentAsyncEnumerable : IAsyncEnumerable + private sealed class DocumentAsyncEnumerable( + CosmosClientWrapper cosmosClient, + string containerId, + PartitionKey partitionKeyValue, + CosmosSqlQuery cosmosSqlQuery) + : IAsyncEnumerable { - private readonly CosmosClientWrapper _cosmosClient; - private readonly string _containerId; - private readonly PartitionKey _partitionKeyValue; - private readonly CosmosSqlQuery _cosmosSqlQuery; - - public DocumentAsyncEnumerable( - CosmosClientWrapper cosmosClient, - string containerId, - PartitionKey partitionKeyValue, - CosmosSqlQuery cosmosSqlQuery) - { - _cosmosClient = cosmosClient; - _containerId = containerId; - _partitionKeyValue = partitionKeyValue; - _cosmosSqlQuery = cosmosSqlQuery; - } + private readonly CosmosClientWrapper _cosmosClient = cosmosClient; + private readonly string _containerId = containerId; + private readonly PartitionKey _partitionKeyValue = partitionKeyValue; + private readonly CosmosSqlQuery _cosmosSqlQuery = cosmosSqlQuery; public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) => new AsyncEnumerator(this, cancellationToken); - private sealed class AsyncEnumerator : IAsyncEnumerator + private sealed class AsyncEnumerator(DocumentAsyncEnumerable documentEnumerable, CancellationToken cancellationToken) + : IAsyncEnumerator { - private readonly CosmosClientWrapper _cosmosClientWrapper; - private readonly string _containerId; - private readonly PartitionKey _partitionKeyValue; - private readonly CosmosSqlQuery _cosmosSqlQuery; - private readonly CancellationToken _cancellationToken; + private readonly CosmosClientWrapper _cosmosClientWrapper = documentEnumerable._cosmosClient; + private readonly string _containerId = documentEnumerable._containerId; + private readonly PartitionKey _partitionKeyValue = documentEnumerable._partitionKeyValue; + private readonly CosmosSqlQuery _cosmosSqlQuery = documentEnumerable._cosmosSqlQuery; private JObject? _current; private ResponseMessage? _responseMessage; @@ -877,23 +859,24 @@ private sealed class AsyncEnumerator : IAsyncEnumerator public JObject Current => _current ?? throw new InvalidOperationException(); - public AsyncEnumerator(DocumentAsyncEnumerable documentEnumerable, CancellationToken cancellationToken) - { - _cosmosClientWrapper = documentEnumerable._cosmosClient; - _containerId = documentEnumerable._containerId; - _partitionKeyValue = documentEnumerable._partitionKeyValue; - _cosmosSqlQuery = documentEnumerable._cosmosSqlQuery; - _cancellationToken = cancellationToken; - } - [MethodImpl(MethodImplOptions.AggressiveInlining)] public async ValueTask MoveNextAsync() { - _cancellationToken.ThrowIfCancellationRequested(); + cancellationToken.ThrowIfCancellationRequested(); if (_jsonReader == null) { - _query ??= _cosmosClientWrapper.CreateQuery(_containerId, _partitionKeyValue, _cosmosSqlQuery); + if (_query is null) + { + var queryRequestOptions = new QueryRequestOptions(); + if (_partitionKeyValue != PartitionKey.None) + { + queryRequestOptions.PartitionKey = _partitionKeyValue; + } + + _query = _cosmosClientWrapper.CreateQuery( + _containerId, _cosmosSqlQuery, continuationToken: null, queryRequestOptions); + } if (!_query.HasMoreResults) { @@ -901,7 +884,7 @@ public async ValueTask MoveNextAsync() return false; } - _responseMessage = await _query.ReadNextAsync(_cancellationToken).ConfigureAwait(false); + _responseMessage = await _query.ReadNextAsync(cancellationToken).ConfigureAwait(false); _cosmosClientWrapper._commandLogger.ExecutedReadNext( _responseMessage.Diagnostics.GetClientElapsedTime(), @@ -948,4 +931,115 @@ public async ValueTask DisposeAsync() } } } + + #region ResponseMessageEnumerable + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + public virtual IEnumerable GetResponseMessageEnumerable(ResponseMessage responseMessage) + => new ResponseMessageEnumerable(responseMessage); + + private sealed class ResponseMessageEnumerable(ResponseMessage responseMessage) : IEnumerable, IAsyncEnumerable + { + public IEnumerator GetEnumerator() + => new ResponseMessageEnumerator(responseMessage); + + IEnumerator IEnumerable.GetEnumerator() + => GetEnumerator(); + + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + => new ResponseMessageAsyncEnumerator(responseMessage); + } + + private sealed class ResponseMessageEnumerator : IEnumerator + { + private readonly Stream _responseStream; + private readonly StreamReader _reader; + private readonly JsonTextReader _jsonReader; + + private JObject? _current; + + public ResponseMessageEnumerator(ResponseMessage responseMessage) + { + _responseStream = responseMessage.Content; + _reader = new StreamReader(_responseStream); + _jsonReader = CreateJsonReader(_reader); + } + + public bool MoveNext() + { + while (_jsonReader.Read()) + { + if (_jsonReader.TokenType == JsonToken.StartObject) + { + _current = Serializer.Deserialize(_jsonReader); + return true; + } + } + + return false; + } + + public JObject Current + => _current ?? throw new InvalidOperationException(); + + object IEnumerator.Current + => Current; + + public void Dispose() + { + _jsonReader.Close(); + _reader.Dispose(); + _responseStream.Dispose(); + } + + public void Reset() + => throw new NotSupportedException(); + } + + private sealed class ResponseMessageAsyncEnumerator : IAsyncEnumerator + { + private readonly Stream _responseStream; + private readonly StreamReader _reader; + private readonly JsonTextReader _jsonReader; + + private JObject? _current; + + public ResponseMessageAsyncEnumerator(ResponseMessage responseMessage) + { + _responseStream = responseMessage.Content; + _reader = new StreamReader(_responseStream); + _jsonReader = CreateJsonReader(_reader); + } + + public async ValueTask MoveNextAsync() + { + while (await _jsonReader.ReadAsync().ConfigureAwait(false)) + { + if (_jsonReader.TokenType == JsonToken.StartObject) + { + _current = Serializer.Deserialize(_jsonReader); + return true; + } + } + + return false; + } + + public JObject Current + => _current ?? throw new InvalidOperationException(); + + public async ValueTask DisposeAsync() + { + _jsonReader.Close(); + _reader.Dispose(); + await _responseStream.DisposeAsync().ConfigureAwait(false); + } + } + + #endregion ResponseMessageEnumerable } diff --git a/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs b/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs index fa8ba51f089..20bb42fa670 100644 --- a/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs +++ b/src/EFCore.Cosmos/Storage/Internal/ICosmosClientWrapper.cs @@ -135,7 +135,11 @@ Task DeleteItemAsync( /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - FeedIterator CreateQuery(string containerId, PartitionKey partitionKeyValue, CosmosSqlQuery query); + FeedIterator CreateQuery( + string containerId, + CosmosSqlQuery query, + string? continuationToken = null, + QueryRequestOptions? queryRequestOptions = null); /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to @@ -181,4 +185,12 @@ IAsyncEnumerable ExecuteSqlQueryAsync( string containerId, PartitionKey partitionKeyValue, CosmosSqlQuery query); + + /// + /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to + /// the same compatibility standards as public APIs. It may be changed or removed without notice in + /// any release. You should only use it directly in your code with extreme caution and knowing that + /// doing so can result in application failures when updating to a new Entity Framework Core release. + /// + IEnumerable GetResponseMessageEnumerable(ResponseMessage responseMessage); } diff --git a/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs b/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs index 41e1d99ceb1..050b78a8fbf 100644 --- a/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs +++ b/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs @@ -5324,6 +5324,65 @@ FROM root c """); }); + [ConditionalFact] + public virtual async Task ToPageAsync() + { + await using var context = CreateContext(); + + var totalCustomers = await context.Set().CountAsync(); + + var page1 = await context.Set() + .OrderBy(c => c.CustomerID) + .ToPageAsync(maxItemCount: 1); + + var customer1 = Assert.Single(page1.Values); + Assert.Equal("ALFKI", customer1.CustomerID); + + var page2 = await context.Set() + .OrderBy(c => c.CustomerID) + .ToPageAsync(continuationToken: page1.ContinuationToken, maxItemCount: 2); + + Assert.Collection( + page2.Values, + c => Assert.Equal("ANATR", c.CustomerID), + c => Assert.Equal("ANTON", c.CustomerID)); + + var page3 = await context.Set() + .OrderBy(c => c.CustomerID) + .ToPageAsync(continuationToken: page2.ContinuationToken); + + Assert.Equal(totalCustomers - 3, page3.Values.Count); + Assert.Null(page3.ContinuationToken); + + AssertSql( + """ +SELECT COUNT(1) AS c +FROM root c +WHERE (c["Discriminator"] = "Customer") +""", + // + """ +SELECT c +FROM root c +WHERE (c["Discriminator"] = "Customer") +ORDER BY c["CustomerID"] +""", + // + """ +SELECT c +FROM root c +WHERE (c["Discriminator"] = "Customer") +ORDER BY c["CustomerID"] +""", + // + """ +SELECT c +FROM root c +WHERE (c["Discriminator"] = "Customer") +ORDER BY c["CustomerID"] +"""); + } + private void AssertSql(params string[] expected) => Fixture.TestSqlLoggerFactory.AssertBaseline(expected); From 2dccae9235bfba6b5795c6e5c862c15118282fa9 Mon Sep 17 00:00:00 2001 From: Shay Rojansky Date: Sat, 29 Jun 2024 09:39:34 +0200 Subject: [PATCH 2/6] Address review comments --- .../Extensions/CosmosQueryableExtensions.cs | 33 ++++++++----------- src/EFCore.Cosmos/Query/CosmosPage.cs | 29 ++++++++++++++++ ...yableMethodTranslatingExpressionVisitor.cs | 3 +- ...ressionVisitor.PagingQueryingEnumerable.cs | 17 ++++------ .../Internal/Expressions/PagingExpression.cs | 10 +++--- 5 files changed, 57 insertions(+), 35 deletions(-) create mode 100644 src/EFCore.Cosmos/Query/CosmosPage.cs diff --git a/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs b/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs index c7b8e89eb90..0224bf9256b 100644 --- a/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs +++ b/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs @@ -1,7 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using Azure; using JetBrains.Annotations; using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal; using Microsoft.EntityFrameworkCore.Query.Internal; @@ -190,14 +189,10 @@ private static FromSqlQueryRootExpression GenerateFromSqlQueryRoot( #region ToPage internal static readonly MethodInfo ToPageAsyncMethodInfo - = typeof(CosmosQueryableExtensions).GetTypeInfo() - .GetDeclaredMethods(nameof(ToPageAsync)) - .Single(); + = typeof(CosmosQueryableExtensions).GetMethod(nameof(ToPageAsync))!; internal static readonly MethodInfo ToPageMethodInfo - = typeof(CosmosQueryableExtensions).GetTypeInfo() - .GetDeclaredMethods(nameof(ToPage)) - .Single(); + = typeof(CosmosQueryableExtensions).GetMethod(nameof(ToPage))!; /// /// Allows paginating through query results by repeatedly executing the same query, passing continuation tokens to retrieve @@ -205,21 +200,21 @@ internal static readonly MethodInfo ToPageMethodInfo /// /// The source query. /// - /// An optional continuation token returned from a previous execution of this query via . - /// If , retrieves query results from the start. + /// An optional continuation token returned from a previous execution of this query via + /// . If , retrieves query results from the start. /// /// - /// The maximum number of results in the returned . The page may contain fewer results of the database + /// The maximum number of results in the returned . The page may contain fewer results of the database /// did not contain enough matching results. /// /// Limits the length of continuation token in the query response. - /// A containing at most results. - public static Page ToPage( + /// A containing at most results. + public static CosmosPage ToPage( this IQueryable source, string? continuationToken = null, int? maxItemCount = null, int? responseContinuationTokenLimitInKb = null) - => source.Provider.Execute>( + => source.Provider.Execute>( Expression.Call( instance: null, method: ToPageMethodInfo.MakeGenericMethod(typeof(TSource)), @@ -237,17 +232,17 @@ public static Page ToPage( /// /// The source query. /// - /// An optional continuation token returned from a previous execution of this query via . - /// If , retrieves query results from the start. + /// An optional continuation token returned from a previous execution of this query via + /// . If , retrieves query results from the start. /// /// - /// The maximum number of results in the returned . The page may contain fewer results of the database + /// The maximum number of results in the returned . The page may contain fewer results of the database /// did not contain enough matching results. /// /// Limits the length of continuation token in the query response. /// A to observe while waiting for the task to complete. - /// A containing at most results. - public static Task> ToPageAsync( + /// A containing at most results. + public static Task> ToPageAsync( this IQueryable source, string? continuationToken = null, int? maxItemCount = null, @@ -259,7 +254,7 @@ public static Task> ToPageAsync( throw new InvalidOperationException(CoreStrings.IQueryableProviderNotAsync); } - return provider.ExecuteAsync>>( + return provider.ExecuteAsync>>( Expression.Call( instance: null, method: ToPageAsyncMethodInfo.MakeGenericMethod(typeof(TSource)), diff --git a/src/EFCore.Cosmos/Query/CosmosPage.cs b/src/EFCore.Cosmos/Query/CosmosPage.cs new file mode 100644 index 00000000000..482429da5df --- /dev/null +++ b/src/EFCore.Cosmos/Query/CosmosPage.cs @@ -0,0 +1,29 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +// ReSharper disable once CheckNamespace +namespace Microsoft.EntityFrameworkCore; + +/// +/// A single page of results returned from a user query; can be used to paginate through long result-sets. +/// Returned by . +/// +/// The values contained in this page. +/// +/// The continuation token for fetching further results from the query. Is or empty when there are no more +/// results. +/// +/// The type of values contained in the page. +public readonly struct CosmosPage(IReadOnlyList values, string? continuationToken) +{ + /// + /// The values contained in this page. + /// + public IReadOnlyList Values { get; } = values; + + /// + /// The continuation token for fetching further results from the query. Is or empty when there are no more + /// results. + /// + public string? ContinuationToken { get; } = continuationToken; +} diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs b/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs index 4baf6d174bc..b2fc7940f6e 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs @@ -137,7 +137,8 @@ public override Expression Translate(Expression expression) shapedQuery.ShaperExpression, translatedContinuationToken, translatedMaxItemCount, - translatedResponseContinuationTokenLimitInKb)) + translatedResponseContinuationTokenLimitInKb, + typeof(CosmosPage<>).MakeGenericType(shapedQuery.ShaperExpression.Type))) .UpdateResultCardinality(ResultCardinality.Single); } diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs index d905b3e9e14..19a59d7e5b8 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs @@ -4,14 +4,9 @@ #nullable disable using System.Collections; -using System.Diagnostics.CodeAnalysis; -using System.Runtime.CompilerServices; -using System.Text; -using Azure; using Microsoft.EntityFrameworkCore.Cosmos.Diagnostics.Internal; using Microsoft.EntityFrameworkCore.Cosmos.Internal; using Microsoft.EntityFrameworkCore.Cosmos.Storage.Internal; -using Newtonsoft.Json; using Newtonsoft.Json.Linq; namespace Microsoft.EntityFrameworkCore.Cosmos.Query.Internal; @@ -24,7 +19,7 @@ namespace Microsoft.EntityFrameworkCore.Cosmos.Query.Internal; /// public partial class CosmosShapedQueryCompilingExpressionVisitor { - private sealed class PagingQueryingEnumerable : IEnumerable>, IAsyncEnumerable> + private sealed class PagingQueryingEnumerable : IEnumerable>, IAsyncEnumerable> { private readonly CosmosQueryContext _cosmosQueryContext; private readonly ISqlExpressionFactory _sqlExpressionFactory; @@ -83,13 +78,13 @@ public PagingQueryingEnumerable( _cosmosContainer = cosmosContainer; } - public IEnumerator> GetEnumerator() + public IEnumerator> GetEnumerator() => new Enumerator(this); IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - public IAsyncEnumerator> GetAsyncEnumerator(CancellationToken cancellationToken = default) + public IAsyncEnumerator> GetAsyncEnumerator(CancellationToken cancellationToken = default) => new Enumerator(this, cancellationToken); private CosmosSqlQuery GenerateQuery() @@ -100,7 +95,7 @@ private CosmosSqlQuery GenerateQuery() .Visit(_selectExpression), _cosmosQueryContext.ParameterValues); - private sealed class Enumerator : IEnumerator>, IAsyncEnumerator> + private sealed class Enumerator : IEnumerator>, IAsyncEnumerator> { private readonly PagingQueryingEnumerable _queryingEnumerable; private readonly CosmosQueryContext _cosmosQueryContext; @@ -137,7 +132,7 @@ public Enumerator(PagingQueryingEnumerable queryingEnumerable, CancellationTo : null; } - public Page Current { get; private set; } + public CosmosPage Current { get; private set; } object IEnumerator.Current => Current; @@ -223,7 +218,7 @@ private async Task MoveNextCore() } } - Current = Page.FromValues(results, continuationToken, null!); // TODO: Response... + Current = new CosmosPage(results, continuationToken); _hasExecuted = true; return true; diff --git a/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs b/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs index 9f63c76a95a..8ef2c9b891e 100644 --- a/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs +++ b/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs @@ -13,7 +13,8 @@ public class PagingExpression( Expression expression, SqlParameterExpression continuationToken, SqlParameterExpression maxItemCount, - SqlParameterExpression responseContinuationTokenLimitInKb) + SqlParameterExpression responseContinuationTokenLimitInKb, + Type type) : Expression, IPrintableExpression { /// @@ -31,8 +32,7 @@ public sealed override ExpressionType NodeType /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - public override Type Type - => Expression.Type; + public override Type Type { get; } = type; /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to @@ -84,7 +84,9 @@ protected override Expression VisitChildren(ExpressionVisitor visitor) public virtual PagingExpression Update(Expression expression) => expression == Expression ? this - : new PagingExpression(expression, ContinuationToken, MaxItemCount, ResponseContinuationTokenLimitInKb); + : expression.Type == Expression.Type + ? new PagingExpression(expression, ContinuationToken, MaxItemCount, ResponseContinuationTokenLimitInKb, Type) + : throw new UnreachableException("Can't change the Type of a PagingExpression"); /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to From 9f94fedb354b0733bd510d239a5567fcfceac8d2 Mon Sep 17 00:00:00 2001 From: Shay Rojansky Date: Sat, 29 Jun 2024 10:09:14 +0200 Subject: [PATCH 3/6] Use ResponseMessageEnumerator for regular document enumeration Fixes #34092 --- .../Storage/Internal/CosmosClientWrapper.cs | 58 +++++++------------ 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs b/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs index 0d371c0bf15..c2c64c84460 100644 --- a/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs +++ b/src/EFCore.Cosmos/Storage/Internal/CosmosClientWrapper.cs @@ -739,9 +739,7 @@ private sealed class Enumerator(DocumentEnumerable documentEnumerable) : IEnumer private JObject? _current; private ResponseMessage? _responseMessage; - private Stream? _responseStream; - private StreamReader? _reader; - private JsonTextReader? _jsonReader; + private IEnumerator? _responseMessageEnumerator; private FeedIterator? _query; @@ -754,7 +752,7 @@ object IEnumerator.Current [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool MoveNext() { - if (_jsonReader == null) + if (_responseMessageEnumerator == null) { if (_query is null) { @@ -786,14 +784,12 @@ public bool MoveNext() _responseMessage.EnsureSuccessStatusCode(); - _responseStream = _responseMessage.Content; - _reader = new StreamReader(_responseStream); - _jsonReader = CreateJsonReader(_reader); + _responseMessageEnumerator = new ResponseMessageEnumerable(_responseMessage).GetEnumerator(); } - if (TryReadJObject(_jsonReader, out var jObject)) + if (_responseMessageEnumerator.MoveNext()) { - _current = jObject; + _current = _responseMessageEnumerator.Current; return true; } @@ -804,20 +800,15 @@ public bool MoveNext() private void ResetRead() { - _jsonReader?.Close(); - _jsonReader = null; - _reader?.Dispose(); - _reader = null; - _responseStream?.Dispose(); - _responseStream = null; + _responseMessageEnumerator?.Dispose(); + _responseMessageEnumerator = null; + _responseMessage?.Dispose(); } public void Dispose() { ResetRead(); - - _responseMessage?.Dispose(); - _responseMessage = null; + _query?.Dispose(); } public void Reset() @@ -850,9 +841,7 @@ private sealed class AsyncEnumerator(DocumentAsyncEnumerable documentEnumerable, private JObject? _current; private ResponseMessage? _responseMessage; - private Stream? _responseStream; - private StreamReader? _reader; - private JsonTextReader? _jsonReader; + private IAsyncEnumerator? _responseMessageEnumerator; private FeedIterator? _query; @@ -864,7 +853,7 @@ public async ValueTask MoveNextAsync() { cancellationToken.ThrowIfCancellationRequested(); - if (_jsonReader == null) + if (_responseMessageEnumerator == null) { if (_query is null) { @@ -896,14 +885,12 @@ public async ValueTask MoveNextAsync() _responseMessage.EnsureSuccessStatusCode(); - _responseStream = _responseMessage.Content; - _reader = new StreamReader(_responseStream); - _jsonReader = CreateJsonReader(_reader); + _responseMessageEnumerator = new ResponseMessageEnumerable(_responseMessage).GetAsyncEnumerator(cancellationToken); } - if (TryReadJObject(_jsonReader, out var jObject)) + if (await _responseMessageEnumerator.MoveNextAsync().ConfigureAwait(false)) { - _current = jObject; + _current = _responseMessageEnumerator.Current; return true; } @@ -914,20 +901,19 @@ public async ValueTask MoveNextAsync() private async Task ResetReadAsync() { - _jsonReader?.Close(); - _jsonReader = null; - await _reader.DisposeAsyncIfAvailable().ConfigureAwait(false); - _reader = null; - await _responseStream.DisposeAsyncIfAvailable().ConfigureAwait(false); - _responseStream = null; + if (_responseMessageEnumerator is not null) + { + await _responseMessageEnumerator.DisposeAsync().ConfigureAwait(false); + _responseMessageEnumerator = null; + } + + _responseMessage?.Dispose(); } public async ValueTask DisposeAsync() { await ResetReadAsync().ConfigureAwait(false); - - await _responseMessage.DisposeAsyncIfAvailable().ConfigureAwait(false); - _responseMessage = null; + _query?.Dispose(); } } } From 3ada9165d55b9195ea85ba4f891b818a6dd6b468 Mon Sep 17 00:00:00 2001 From: Shay Rojansky Date: Sat, 29 Jun 2024 10:23:29 +0200 Subject: [PATCH 4/6] Make maxItemCount required --- .../Extensions/CosmosQueryableExtensions.cs | 8 ++--- ...yableMethodTranslatingExpressionVisitor.cs | 6 ++-- ...ressionVisitor.PagingQueryingEnumerable.cs | 12 +++---- ...osShapedQueryCompilingExpressionVisitor.cs | 6 ++-- .../Internal/Expressions/PagingExpression.cs | 6 ++-- .../NorthwindMiscellaneousQueryCosmosTest.cs | 34 +++++++++++++++++-- 6 files changed, 51 insertions(+), 21 deletions(-) diff --git a/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs b/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs index 0224bf9256b..a2c05079ef6 100644 --- a/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs +++ b/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs @@ -211,8 +211,8 @@ internal static readonly MethodInfo ToPageMethodInfo /// A containing at most results. public static CosmosPage ToPage( this IQueryable source, + int maxItemCount, string? continuationToken = null, - int? maxItemCount = null, int? responseContinuationTokenLimitInKb = null) => source.Provider.Execute>( Expression.Call( @@ -221,8 +221,8 @@ public static CosmosPage ToPage( arguments: [ source.Expression, + Expression.Constant(maxItemCount, typeof(int)), Expression.Constant(continuationToken, typeof(string)), - Expression.Constant(maxItemCount, typeof(int?)), Expression.Constant(responseContinuationTokenLimitInKb, typeof(int?)) ])); @@ -244,8 +244,8 @@ public static CosmosPage ToPage( /// A containing at most results. public static Task> ToPageAsync( this IQueryable source, + int maxItemCount, string? continuationToken = null, - int? maxItemCount = null, int? responseContinuationTokenLimitInKb = null, CancellationToken cancellationToken = default) { @@ -261,8 +261,8 @@ public static Task> ToPageAsync( arguments: [ source.Expression, + Expression.Constant(maxItemCount, typeof(int)), Expression.Constant(continuationToken, typeof(string)), - Expression.Constant(maxItemCount, typeof(int?)), Expression.Constant(responseContinuationTokenLimitInKb, typeof(int?)), Expression.Constant(default(CancellationToken), typeof(CancellationToken)) ]), diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs b/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs index b2fc7940f6e..6c1fd5dc950 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs @@ -117,13 +117,13 @@ public override Expression Translate(Expression expression) if (arguments is not [ _, // source - ParameterExpression continuationToken, ParameterExpression maxItemCount, + ParameterExpression continuationToken, ParameterExpression responseContinuationTokenLimitInKb, .. ] - || _sqlTranslator.Translate(continuationToken) is not SqlParameterExpression translatedContinuationToken || _sqlTranslator.Translate(maxItemCount) is not SqlParameterExpression translatedMaxItemCount + || _sqlTranslator.Translate(continuationToken) is not SqlParameterExpression translatedContinuationToken || _sqlTranslator.Translate(responseContinuationTokenLimitInKb) is not SqlParameterExpression translatedResponseContinuationTokenLimitInKb) { @@ -135,8 +135,8 @@ public override Expression Translate(Expression expression) return shapedQuery .UpdateShaperExpression(new PagingExpression( shapedQuery.ShaperExpression, - translatedContinuationToken, translatedMaxItemCount, + translatedContinuationToken, translatedResponseContinuationTokenLimitInKb, typeof(CosmosPage<>).MakeGenericType(shapedQuery.ShaperExpression.Type))) .UpdateResultCardinality(ResultCardinality.Single); diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs index 19a59d7e5b8..8d19f31596b 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs @@ -33,8 +33,8 @@ private sealed class PagingQueryingEnumerable : IEnumerable>, I private readonly IDiagnosticsLogger _commandLogger; private readonly bool _standAloneStateManager; private readonly bool _threadSafetyChecksEnabled; - private readonly string _continuationTokenParameterName; private readonly string _maxItemCountParameterName; + private readonly string _continuationTokenParameterName; private readonly string _responseContinuationTokenLimitInKbParameterName; public PagingQueryingEnumerable( @@ -48,8 +48,8 @@ public PagingQueryingEnumerable( PartitionKey partitionKeyValueFromExtension, bool standAloneStateManager, bool threadSafetyChecksEnabled, - string continuationTokenParameterName, string maxItemCountParameterName, + string continuationTokenParameterName, string responseContinuationTokenLimitInKbParameterName) { _cosmosQueryContext = cosmosQueryContext; @@ -62,8 +62,8 @@ public PagingQueryingEnumerable( _commandLogger = cosmosQueryContext.CommandLogger; _standAloneStateManager = standAloneStateManager; _threadSafetyChecksEnabled = threadSafetyChecksEnabled; - _continuationTokenParameterName = continuationTokenParameterName; _maxItemCountParameterName = maxItemCountParameterName; + _continuationTokenParameterName = continuationTokenParameterName; _responseContinuationTokenLimitInKbParameterName = responseContinuationTokenLimitInKbParameterName; var partitionKey = selectExpression.GetPartitionKeyValue(cosmosQueryContext.ParameterValues); @@ -159,9 +159,9 @@ private async Task MoveNextCore() _hasExecuted = true; + var maxItemCount = (int)_cosmosQueryContext.ParameterValues[_queryingEnumerable._maxItemCountParameterName]; var continuationToken = (string)_cosmosQueryContext.ParameterValues[_queryingEnumerable._continuationTokenParameterName]; - var maxItemCount = (int?)_cosmosQueryContext.ParameterValues[_queryingEnumerable._maxItemCountParameterName]; var responseContinuationTokenLimitInKb = (int?) _cosmosQueryContext.ParameterValues[_queryingEnumerable._responseContinuationTokenLimitInKbParameterName]; @@ -183,9 +183,9 @@ private async Task MoveNextCore() _commandLogger.ExecutingSqlQuery(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery); _cosmosQueryContext.InitializeStateManager(_standAloneStateManager); - var results = maxItemCount.HasValue ? new List(maxItemCount.Value) : []; + var results = new List(maxItemCount); - while (maxItemCount is null or > 0) + while (maxItemCount > 0) { queryRequestOptions.MaxItemCount = maxItemCount; using var feedIterator = cosmosClient.CreateQuery( diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs index dea15f6877d..54dc03462c9 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.cs @@ -45,7 +45,7 @@ protected override Expression VisitShapedQuery(ShapedQueryExpression shapedQuery var shaperBody = shapedQueryExpression.ShaperExpression; - var (paging, continuationToken, maxItemCount, responseContinuationTokenLimitInKb) = + var (paging, maxItemCount, continuationToken, responseContinuationTokenLimitInKb) = (false, (SqlParameterExpression)null, (SqlParameterExpression)null, (SqlParameterExpression)null); // If the query is terminated ToPageAsync(), CosmosQueryableMethodTranslatingExpressionVisitor composed a PagingExpression on top @@ -54,8 +54,8 @@ protected override Expression VisitShapedQuery(ShapedQueryExpression shapedQuery if (shaperBody is PagingExpression pagingExpression) { paging = true; - continuationToken = pagingExpression.ContinuationToken; maxItemCount = pagingExpression.MaxItemCount; + continuationToken = pagingExpression.ContinuationToken; responseContinuationTokenLimitInKb = pagingExpression.ResponseContinuationTokenLimitInKb; shaperBody = pagingExpression.Expression; @@ -113,8 +113,8 @@ protected override Expression VisitShapedQuery(ShapedQueryExpression shapedQuery Constant(_partitionKeyValueFromExtension, typeof(PartitionKey)), standAloneStateManagerConstant, threadSafetyConstant, - Constant(continuationToken.Name), Constant(maxItemCount.Name), + Constant(continuationToken.Name), Constant(responseContinuationTokenLimitInKb.Name)), _ => New( diff --git a/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs b/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs index 8ef2c9b891e..0399f985067 100644 --- a/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs +++ b/src/EFCore.Cosmos/Query/Internal/Expressions/PagingExpression.cs @@ -11,8 +11,8 @@ namespace Microsoft.EntityFrameworkCore.Cosmos.Query.Internal.Expressions; /// public class PagingExpression( Expression expression, - SqlParameterExpression continuationToken, SqlParameterExpression maxItemCount, + SqlParameterExpression continuationToken, SqlParameterExpression responseContinuationTokenLimitInKb, Type type) : Expression, IPrintableExpression @@ -48,7 +48,7 @@ public sealed override ExpressionType NodeType /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - public virtual SqlParameterExpression ContinuationToken { get; } = continuationToken; + public virtual SqlParameterExpression MaxItemCount { get; } = maxItemCount; /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to @@ -56,7 +56,7 @@ public sealed override ExpressionType NodeType /// any release. You should only use it directly in your code with extreme caution and knowing that /// doing so can result in application failures when updating to a new Entity Framework Core release. /// - public virtual SqlParameterExpression MaxItemCount { get; } = maxItemCount; + public virtual SqlParameterExpression ContinuationToken { get; } = continuationToken; /// /// This is an internal API that supports the Entity Framework Core infrastructure and not subject to diff --git a/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs b/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs index 050b78a8fbf..6160eacaa3e 100644 --- a/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs +++ b/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs @@ -5340,7 +5340,7 @@ public virtual async Task ToPageAsync() var page2 = await context.Set() .OrderBy(c => c.CustomerID) - .ToPageAsync(continuationToken: page1.ContinuationToken, maxItemCount: 2); + .ToPageAsync(maxItemCount: 2, page1.ContinuationToken); Assert.Collection( page2.Values, @@ -5349,7 +5349,7 @@ public virtual async Task ToPageAsync() var page3 = await context.Set() .OrderBy(c => c.CustomerID) - .ToPageAsync(continuationToken: page2.ContinuationToken); + .ToPageAsync(maxItemCount: totalCustomers, page2.ContinuationToken); Assert.Equal(totalCustomers - 3, page3.Values.Count); Assert.Null(page3.ContinuationToken); @@ -5383,6 +5383,36 @@ ORDER BY c["CustomerID"] """); } + [ConditionalFact] + public virtual async Task ToPageAsync_with_exact_maxItemCount() + { + await using var context = CreateContext(); + + var totalCustomers = await context.Set().CountAsync(); + + var onlyPage = await context.Set() + .OrderBy(c => c.CustomerID) + .ToPageAsync(maxItemCount: totalCustomers); + + Assert.Equal("ALFKI", onlyPage.Values[0].CustomerID); + Assert.Equal("WOLZA", onlyPage.Values[^1].CustomerID); + Assert.Null(onlyPage.ContinuationToken); + + AssertSql( + """ +SELECT COUNT(1) AS c +FROM root c +WHERE (c["Discriminator"] = "Customer") +""", + // + """ +SELECT c +FROM root c +WHERE (c["Discriminator"] = "Customer") +ORDER BY c["CustomerID"] +"""); + } + private void AssertSql(params string[] expected) => Fixture.TestSqlLoggerFactory.AssertBaseline(expected); From 91794d22bf1fbb3d7e24ed350600227ede7f6e6b Mon Sep 17 00:00:00 2001 From: Shay Rojansky Date: Mon, 1 Jul 2024 22:56:53 +0200 Subject: [PATCH 5/6] Address design meeting comments https://github.com/dotnet/efcore/pull/34103#issuecomment-2201021149 --- src/EFCore.Analyzers/EFDiagnostics.cs | 1 + src/EFCore.Cosmos/EFCore.Cosmos.csproj | 1 + .../Extensions/CosmosQueryableExtensions.cs | 60 +++---------------- src/EFCore.Cosmos/Query/CosmosPage.cs | 5 ++ ...yableMethodTranslatingExpressionVisitor.cs | 4 +- ...ressionVisitor.PagingQueryingEnumerable.cs | 34 +++-------- .../EFCore.Cosmos.FunctionalTests.csproj | 1 + .../NorthwindMiscellaneousQueryCosmosTest.cs | 8 +-- 8 files changed, 29 insertions(+), 85 deletions(-) diff --git a/src/EFCore.Analyzers/EFDiagnostics.cs b/src/EFCore.Analyzers/EFDiagnostics.cs index b8eb034043a..acedc129899 100644 --- a/src/EFCore.Analyzers/EFDiagnostics.cs +++ b/src/EFCore.Analyzers/EFDiagnostics.cs @@ -17,4 +17,5 @@ public static class EFDiagnostics public const string ProviderExperimentalApi = "EF9002"; public const string PrecompiledQueryExperimental = "EF9100"; public const string MetricsExperimental = "EF9101"; + public const string PagingExperimental = "EF9102"; } diff --git a/src/EFCore.Cosmos/EFCore.Cosmos.csproj b/src/EFCore.Cosmos/EFCore.Cosmos.csproj index 935e7f25195..d14b89967ce 100644 --- a/src/EFCore.Cosmos/EFCore.Cosmos.csproj +++ b/src/EFCore.Cosmos/EFCore.Cosmos.csproj @@ -10,6 +10,7 @@ $(PackageTags);CosmosDb;SQL API true $(NoWarn);EF9101 + $(NoWarn);EF9102 diff --git a/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs b/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs index a2c05079ef6..7def403562a 100644 --- a/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs +++ b/src/EFCore.Cosmos/Extensions/CosmosQueryableExtensions.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Diagnostics.CodeAnalysis; using JetBrains.Annotations; using Microsoft.EntityFrameworkCore.Cosmos.Query.Internal; using Microsoft.EntityFrameworkCore.Query.Internal; @@ -17,8 +18,6 @@ namespace Microsoft.EntityFrameworkCore; /// public static class CosmosQueryableExtensions { - #region WithPartitionKey - internal static readonly MethodInfo WithPartitionKeyMethodInfo = typeof(CosmosQueryableExtensions).GetTypeInfo() .GetDeclaredMethods(nameof(WithPartitionKey)) @@ -76,10 +75,6 @@ source.Provider is EntityQueryProvider : source; } - #endregion WithPartitionKey - - #region FromSql - /// /// Creates a LINQ query based on an interpolated string representing a SQL query. /// @@ -184,47 +179,9 @@ private static FromSqlQueryRootExpression GenerateFromSqlQueryRoot( Expression.Constant(arguments)); } - #endregion FromSql - - #region ToPage - internal static readonly MethodInfo ToPageAsyncMethodInfo = typeof(CosmosQueryableExtensions).GetMethod(nameof(ToPageAsync))!; - internal static readonly MethodInfo ToPageMethodInfo - = typeof(CosmosQueryableExtensions).GetMethod(nameof(ToPage))!; - - /// - /// Allows paginating through query results by repeatedly executing the same query, passing continuation tokens to retrieve - /// successive pages of the result set, and specifying the maximum number of results per page. - /// - /// The source query. - /// - /// An optional continuation token returned from a previous execution of this query via - /// . If , retrieves query results from the start. - /// - /// - /// The maximum number of results in the returned . The page may contain fewer results of the database - /// did not contain enough matching results. - /// - /// Limits the length of continuation token in the query response. - /// A containing at most results. - public static CosmosPage ToPage( - this IQueryable source, - int maxItemCount, - string? continuationToken = null, - int? responseContinuationTokenLimitInKb = null) - => source.Provider.Execute>( - Expression.Call( - instance: null, - method: ToPageMethodInfo.MakeGenericMethod(typeof(TSource)), - arguments: - [ - source.Expression, - Expression.Constant(maxItemCount, typeof(int)), - Expression.Constant(continuationToken, typeof(string)), - Expression.Constant(responseContinuationTokenLimitInKb, typeof(int?)) - ])); /// /// Allows paginating through query results by repeatedly executing the same query, passing continuation tokens to retrieve @@ -235,17 +192,18 @@ public static CosmosPage ToPage( /// An optional continuation token returned from a previous execution of this query via /// . If , retrieves query results from the start. /// - /// - /// The maximum number of results in the returned . The page may contain fewer results of the database + /// + /// The maximum number of results in the returned . The page may contain fewer results if the database /// did not contain enough matching results. /// /// Limits the length of continuation token in the query response. /// A to observe while waiting for the task to complete. - /// A containing at most results. + /// A containing at most results. + [Experimental(EFDiagnostics.PagingExperimental)] public static Task> ToPageAsync( this IQueryable source, - int maxItemCount, - string? continuationToken = null, + int pageSize, + string? continuationToken, int? responseContinuationTokenLimitInKb = null, CancellationToken cancellationToken = default) { @@ -261,13 +219,11 @@ public static Task> ToPageAsync( arguments: [ source.Expression, - Expression.Constant(maxItemCount, typeof(int)), + Expression.Constant(pageSize, typeof(int)), Expression.Constant(continuationToken, typeof(string)), Expression.Constant(responseContinuationTokenLimitInKb, typeof(int?)), Expression.Constant(default(CancellationToken), typeof(CancellationToken)) ]), cancellationToken); } - - #endregion ToPage } diff --git a/src/EFCore.Cosmos/Query/CosmosPage.cs b/src/EFCore.Cosmos/Query/CosmosPage.cs index 482429da5df..a6af338b517 100644 --- a/src/EFCore.Cosmos/Query/CosmosPage.cs +++ b/src/EFCore.Cosmos/Query/CosmosPage.cs @@ -1,6 +1,10 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +// ReSharper disable once CheckNamespace + +using System.Diagnostics.CodeAnalysis; + // ReSharper disable once CheckNamespace namespace Microsoft.EntityFrameworkCore; @@ -14,6 +18,7 @@ namespace Microsoft.EntityFrameworkCore; /// results. /// /// The type of values contained in the page. +[Experimental(EFDiagnostics.PagingExperimental)] public readonly struct CosmosPage(IReadOnlyList values, string? continuationToken) { /// diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs b/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs index 6c1fd5dc950..1bc4e87cd2a 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosQueryableMethodTranslatingExpressionVisitor.cs @@ -98,7 +98,7 @@ public override Expression Translate(Expression expression) // Handle ToPageAsync(), which can only ever be the top-level node in the query tree. if (expression is MethodCallExpression { Method: var method, Arguments: var arguments } && method.DeclaringType == typeof(CosmosQueryableExtensions) - && method.Name is nameof(CosmosQueryableExtensions.ToPage) or nameof(CosmosQueryableExtensions.ToPageAsync)) + && method.Name is nameof(CosmosQueryableExtensions.ToPageAsync)) { var source = base.Translate(arguments[0]); @@ -120,7 +120,7 @@ public override Expression Translate(Expression expression) ParameterExpression maxItemCount, ParameterExpression continuationToken, ParameterExpression responseContinuationTokenLimitInKb, - .. + _ // cancellation token ] || _sqlTranslator.Translate(maxItemCount) is not SqlParameterExpression translatedMaxItemCount || _sqlTranslator.Translate(continuationToken) is not SqlParameterExpression translatedContinuationToken diff --git a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs index 8d19f31596b..6685d6fc8a5 100644 --- a/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs +++ b/src/EFCore.Cosmos/Query/Internal/CosmosShapedQueryCompilingExpressionVisitor.PagingQueryingEnumerable.cs @@ -19,7 +19,7 @@ namespace Microsoft.EntityFrameworkCore.Cosmos.Query.Internal; /// public partial class CosmosShapedQueryCompilingExpressionVisitor { - private sealed class PagingQueryingEnumerable : IEnumerable>, IAsyncEnumerable> + private sealed class PagingQueryingEnumerable : IAsyncEnumerable> { private readonly CosmosQueryContext _cosmosQueryContext; private readonly ISqlExpressionFactory _sqlExpressionFactory; @@ -78,14 +78,8 @@ public PagingQueryingEnumerable( _cosmosContainer = cosmosContainer; } - public IEnumerator> GetEnumerator() - => new Enumerator(this); - - IEnumerator IEnumerable.GetEnumerator() - => GetEnumerator(); - public IAsyncEnumerator> GetAsyncEnumerator(CancellationToken cancellationToken = default) - => new Enumerator(this, cancellationToken); + => new AsyncEnumerator(this, cancellationToken); private CosmosSqlQuery GenerateQuery() => _querySqlGeneratorFactory.Create().GetSqlQuery( @@ -95,7 +89,7 @@ private CosmosSqlQuery GenerateQuery() .Visit(_selectExpression), _cosmosQueryContext.ParameterValues); - private sealed class Enumerator : IEnumerator>, IAsyncEnumerator> + private sealed class AsyncEnumerator : IAsyncEnumerator> { private readonly PagingQueryingEnumerable _queryingEnumerable; private readonly CosmosQueryContext _cosmosQueryContext; @@ -113,7 +107,7 @@ private sealed class Enumerator : IEnumerator>, IAsyncEnumerator queryingEnumerable, CancellationToken cancellationToken = default) + public AsyncEnumerator(PagingQueryingEnumerable queryingEnumerable, CancellationToken cancellationToken = default) { _queryingEnumerable = queryingEnumerable; _cosmosQueryContext = queryingEnumerable._cosmosQueryContext; @@ -134,17 +128,9 @@ public Enumerator(PagingQueryingEnumerable queryingEnumerable, CancellationTo public CosmosPage Current { get; private set; } - object IEnumerator.Current => Current; - - public bool MoveNext() - => MoveNextCore().GetAwaiter().GetResult(); - - public ValueTask MoveNextAsync() - => new(MoveNextCore()); - - private async Task MoveNextCore() + public async ValueTask MoveNextAsync() { - ObjectDisposedException.ThrowIf(_isDisposed, typeof(Enumerator)); + ObjectDisposedException.ThrowIf(_isDisposed, typeof(AsyncEnumerator)); try { @@ -245,16 +231,10 @@ private async Task MoveNextCore() public ValueTask DisposeAsync() { - Dispose(); + _isDisposed = true; return default; } - - public void Dispose() - => _isDisposed = true; - - public void Reset() - => throw new NotSupportedException(); } } } diff --git a/test/EFCore.Cosmos.FunctionalTests/EFCore.Cosmos.FunctionalTests.csproj b/test/EFCore.Cosmos.FunctionalTests/EFCore.Cosmos.FunctionalTests.csproj index 01df755416e..a8ecf58320d 100644 --- a/test/EFCore.Cosmos.FunctionalTests/EFCore.Cosmos.FunctionalTests.csproj +++ b/test/EFCore.Cosmos.FunctionalTests/EFCore.Cosmos.FunctionalTests.csproj @@ -8,6 +8,7 @@ true true + $(NoWarn);EF9102 diff --git a/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs b/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs index 6160eacaa3e..f9dccc60c94 100644 --- a/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs +++ b/test/EFCore.Cosmos.FunctionalTests/Query/NorthwindMiscellaneousQueryCosmosTest.cs @@ -5333,14 +5333,14 @@ public virtual async Task ToPageAsync() var page1 = await context.Set() .OrderBy(c => c.CustomerID) - .ToPageAsync(maxItemCount: 1); + .ToPageAsync(pageSize: 1, continuationToken: null); var customer1 = Assert.Single(page1.Values); Assert.Equal("ALFKI", customer1.CustomerID); var page2 = await context.Set() .OrderBy(c => c.CustomerID) - .ToPageAsync(maxItemCount: 2, page1.ContinuationToken); + .ToPageAsync(pageSize: 2, page1.ContinuationToken); Assert.Collection( page2.Values, @@ -5349,7 +5349,7 @@ public virtual async Task ToPageAsync() var page3 = await context.Set() .OrderBy(c => c.CustomerID) - .ToPageAsync(maxItemCount: totalCustomers, page2.ContinuationToken); + .ToPageAsync(pageSize: totalCustomers, page2.ContinuationToken); Assert.Equal(totalCustomers - 3, page3.Values.Count); Assert.Null(page3.ContinuationToken); @@ -5392,7 +5392,7 @@ public virtual async Task ToPageAsync_with_exact_maxItemCount() var onlyPage = await context.Set() .OrderBy(c => c.CustomerID) - .ToPageAsync(maxItemCount: totalCustomers); + .ToPageAsync(pageSize: totalCustomers, continuationToken: null); Assert.Equal("ALFKI", onlyPage.Values[0].CustomerID); Assert.Equal("WOLZA", onlyPage.Values[^1].CustomerID); From 370786bad680562f22f7083673d11fa7f0af3b2b Mon Sep 17 00:00:00 2001 From: Shay Rojansky Date: Mon, 1 Jul 2024 23:58:17 +0200 Subject: [PATCH 6/6] Remove comment --- src/EFCore.Cosmos/Query/CosmosPage.cs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/EFCore.Cosmos/Query/CosmosPage.cs b/src/EFCore.Cosmos/Query/CosmosPage.cs index a6af338b517..3ab66791975 100644 --- a/src/EFCore.Cosmos/Query/CosmosPage.cs +++ b/src/EFCore.Cosmos/Query/CosmosPage.cs @@ -1,8 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -// ReSharper disable once CheckNamespace - using System.Diagnostics.CodeAnalysis; // ReSharper disable once CheckNamespace