From 3836301665fc8fa499409c8c0ac2acd187cfd774 Mon Sep 17 00:00:00 2001 From: Tanay Parikh Date: Mon, 28 Jun 2021 11:53:16 -0700 Subject: [PATCH 1/6] Streaming Interop Followup Items --- .../Server/src/Circuits/RemoteJSDataStream.cs | 12 +++++++----- .../Server/src/Circuits/RemoteJSRuntime.cs | 4 ++-- .../Server/test/Circuits/RemoteJSDataStreamTest.cs | 9 +++++---- .../src/Forms/InputFile/RemoteBrowserFileStream.cs | 6 +++++- .../Microsoft.JSInterop/src/IJSStreamReference.cs | 7 ++++--- .../src/Implementation/JSStreamReference.cs | 4 ++-- src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs | 5 +++-- .../Microsoft.JSInterop/src/PublicAPI.Unshipped.txt | 4 ++-- .../Microsoft.JSInterop/test/JSRuntimeTest.cs | 2 +- 9 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs index 807f18402401..2180cdd77ea9 100644 --- a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs +++ b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs @@ -40,10 +40,11 @@ public static async ValueTask CreateRemoteJSDataStreamAsync( RemoteJSRuntime runtime, IJSStreamReference jsStreamReference, long totalLength, - long maxBufferSize, long maximumIncomingBytes, TimeSpan jsInteropDefaultCallTimeout, - CancellationToken cancellationToken = default) + long pauseIncomingBytesThreshold, + long resumeIncomingBytesThreshold, + CancellationToken cancellationToken) { // Enforce minimum 1 kb, maximum 50 kb, SignalR message size. // We budget 512 bytes overhead for the transfer, thus leaving at least 512 bytes for data @@ -54,7 +55,7 @@ public static async ValueTask CreateRemoteJSDataStreamAsync( throw new ArgumentException($"SignalR MaximumIncomingBytes must be at least 1 kb."); var streamId = runtime.RemoteJSDataStreamNextInstanceId++; - var remoteJSDataStream = new RemoteJSDataStream(runtime, streamId, totalLength, maxBufferSize, jsInteropDefaultCallTimeout, cancellationToken); + var remoteJSDataStream = new RemoteJSDataStream(runtime, streamId, totalLength, jsInteropDefaultCallTimeout, pauseIncomingBytesThreshold, resumeIncomingBytesThreshold, cancellationToken); await runtime.InvokeVoidAsync("Blazor._internal.sendJSDataStream", jsStreamReference, streamId, chunkSize); return remoteJSDataStream; } @@ -63,8 +64,9 @@ private RemoteJSDataStream( RemoteJSRuntime runtime, long streamId, long totalLength, - long maxBufferSize, TimeSpan jsInteropDefaultCallTimeout, + long pauseIncomingBytesThreshold, + long resumeIncomingBytesThreshold, CancellationToken cancellationToken) { _runtime = runtime; @@ -78,7 +80,7 @@ private RemoteJSDataStream( _runtime.RemoteJSDataStreamInstances.Add(_streamId, this); - _pipe = new Pipe(new PipeOptions(pauseWriterThreshold: maxBufferSize, resumeWriterThreshold: maxBufferSize / 2)); + _pipe = new Pipe(new PipeOptions(pauseWriterThreshold: pauseIncomingBytesThreshold, resumeWriterThreshold: resumeIncomingBytesThreshold)); _pipeReaderStream = _pipe.Reader.AsStream(); } diff --git a/src/Components/Server/src/Circuits/RemoteJSRuntime.cs b/src/Components/Server/src/Circuits/RemoteJSRuntime.cs index 757359f1f926..0ceda9b752e8 100644 --- a/src/Components/Server/src/Circuits/RemoteJSRuntime.cs +++ b/src/Components/Server/src/Circuits/RemoteJSRuntime.cs @@ -157,8 +157,8 @@ public void MarkPermanentlyDisconnected() _clientProxy = null; } - protected override async Task ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long maxBufferSize, CancellationToken cancellationToken) - => await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(this, jsStreamReference, totalLength, maxBufferSize, _maximumIncomingBytes, _options.JSInteropDefaultCallTimeout, cancellationToken); + protected override async Task ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long pauseIncomingBytesThreshold, long resumeIncomingBytesThreshold, CancellationToken cancellationToken) + => await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(this, jsStreamReference, totalLength, _maximumIncomingBytes, _options.JSInteropDefaultCallTimeout, pauseIncomingBytesThreshold, resumeIncomingBytesThreshold, cancellationToken); public static class Log { diff --git a/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs b/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs index 14dcf3eec46c..e3e43a593489 100644 --- a/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs +++ b/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs @@ -4,6 +4,7 @@ using System; using System.IO; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; @@ -25,7 +26,7 @@ public async void CreateRemoteJSDataStreamAsync_CreatesStream() var jsStreamReference = Mock.Of(); // Act - var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(_jsRuntime, jsStreamReference, totalLength: 100, maxBufferSize: 50, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1)); + var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(_jsRuntime, jsStreamReference, totalLength: 100, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None); // Assert Assert.NotNull(remoteJSDataStream); @@ -116,7 +117,7 @@ public async void ReceiveData_ProvidedWithMoreBytesThanRemaining() // Arrange var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); var jsStreamReference = Mock.Of(); - var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime, jsStreamReference, totalLength: 100, maxBufferSize: 50, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1)); + var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime, jsStreamReference, totalLength: 100, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None); var streamId = GetStreamId(remoteJSDataStream, jsRuntime); var chunk = new byte[110]; // 100 byte totalLength for stream @@ -136,7 +137,7 @@ public async void ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDiscon // Arrange var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); var jsStreamReference = Mock.Of(); - var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime, jsStreamReference, totalLength: 100, maxBufferSize: 50, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1)); + var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime, jsStreamReference, totalLength: 100, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None); var streamId = GetStreamId(remoteJSDataStream, jsRuntime); var chunk = new byte[5]; @@ -157,7 +158,7 @@ public async void ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDiscon private static async Task CreateRemoteJSDataStreamAsync(TestRemoteJSRuntime jsRuntime = null) { var jsStreamReference = Mock.Of(); - var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime ?? _jsRuntime, jsStreamReference, totalLength: 100, maxBufferSize: 50, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1)); + var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(jsRuntime ?? _jsRuntime, jsStreamReference, totalLength: 100, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None); return remoteJSDataStream; } diff --git a/src/Components/Web/src/Forms/InputFile/RemoteBrowserFileStream.cs b/src/Components/Web/src/Forms/InputFile/RemoteBrowserFileStream.cs index ddbee6e770db..9d633304a183 100644 --- a/src/Components/Web/src/Forms/InputFile/RemoteBrowserFileStream.cs +++ b/src/Components/Web/src/Forms/InputFile/RemoteBrowserFileStream.cs @@ -45,7 +45,11 @@ private async Task OpenReadStreamAsync(RemoteBrowserFileStreamOptions op _inputFileElement, File.Id); - return await dataReference.OpenReadStreamAsync(_maxAllowedSize, options.MaxBufferSize, cancellationToken); + return await dataReference.OpenReadStreamAsync( + _maxAllowedSize, + pauseIncomingBytesThreshold: options.MaxBufferSize, + resumeIncomingBytesThreshold: options.MaxBufferSize / 2, + cancellationToken); } protected override async ValueTask CopyFileDataIntoBuffer(long sourceOffset, Memory destination, CancellationToken cancellationToken) diff --git a/src/JSInterop/Microsoft.JSInterop/src/IJSStreamReference.cs b/src/JSInterop/Microsoft.JSInterop/src/IJSStreamReference.cs index 4cf01b5763f5..5fa34ca4aeb6 100644 --- a/src/JSInterop/Microsoft.JSInterop/src/IJSStreamReference.cs +++ b/src/JSInterop/Microsoft.JSInterop/src/IJSStreamReference.cs @@ -16,15 +16,16 @@ public interface IJSStreamReference : IAsyncDisposable /// /// Length of the provided by JavaScript. /// - public long Length { get; } + long Length { get; } /// /// Opens a with the for the current data reference. /// /// Maximum number of bytes permitted to be read from JavaScript. - /// Maximum number of bytes that are allowed to be buffered. + /// The number of unconsumed bytes to accept from JS before blocking. A value of zero prevents JS from blocking, allowing .NET to receive an unlimited number of bytes. + /// The number of unflushed bytes at which point JS stops blocking. /// for cancelling read. /// which can provide data associated with the current data reference. - ValueTask OpenReadStreamAsync(long maxAllowedSize = 512000, long maxBufferSize = 100 * 1024, CancellationToken cancellationToken = default); + ValueTask OpenReadStreamAsync(long maxAllowedSize = 512000, long pauseIncomingBytesThreshold = -1, long resumeIncomingBytesThreshold = -1, CancellationToken cancellationToken = default); } } diff --git a/src/JSInterop/Microsoft.JSInterop/src/Implementation/JSStreamReference.cs b/src/JSInterop/Microsoft.JSInterop/src/Implementation/JSStreamReference.cs index 269bcc9f87d8..a291c93b924d 100644 --- a/src/JSInterop/Microsoft.JSInterop/src/Implementation/JSStreamReference.cs +++ b/src/JSInterop/Microsoft.JSInterop/src/Implementation/JSStreamReference.cs @@ -36,14 +36,14 @@ internal JSStreamReference(JSRuntime jsRuntime, long id, long totalLength) : bas } /// - async ValueTask IJSStreamReference.OpenReadStreamAsync(long maxLength, long maxBufferSize, CancellationToken cancellationToken) + async ValueTask IJSStreamReference.OpenReadStreamAsync(long maxLength, long pauseIncomingBytesThreshold, long resumeIncomingBytesThreshold, CancellationToken cancellationToken) { if (Length > maxLength) { throw new ArgumentOutOfRangeException(nameof(maxLength), $"The incoming data stream of length {Length} exceeds the maximum length {maxLength}."); } - return await _jsRuntime.ReadJSDataAsStreamAsync(this, Length, maxBufferSize, cancellationToken); + return await _jsRuntime.ReadJSDataAsStreamAsync(this, Length, pauseIncomingBytesThreshold, resumeIncomingBytesThreshold, cancellationToken); } } } diff --git a/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs b/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs index 389166ab9b40..a3c8fbc99bb1 100644 --- a/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs +++ b/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs @@ -215,10 +215,11 @@ protected internal virtual void ReceiveByteArray(int id, byte[] data) /// /// to produce a data stream for. /// Expected length of the incoming data stream. - /// Amount of bytes to buffer before flushing. + /// The number of unconsumed bytes to accept from JS before blocking. A value of zero prevents JS from blocking, allowing .NET to receive an unlimited number of bytes. + /// The number of unflushed bytes at which point JS stops blocking. /// for cancelling read. /// for the data reference represented by . - protected internal virtual Task ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long maxBufferSize, CancellationToken cancellationToken) + protected internal virtual Task ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long pauseIncomingBytesThreshold, long resumeIncomingBytesThreshold, CancellationToken cancellationToken) { // The reason it's virtual and not abstract is just for back-compat diff --git a/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt b/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt index e1e22df85d77..3340aecfb9cb 100644 --- a/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt +++ b/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt @@ -1,7 +1,7 @@ #nullable enable Microsoft.JSInterop.IJSStreamReference Microsoft.JSInterop.IJSStreamReference.Length.get -> long -Microsoft.JSInterop.IJSStreamReference.OpenReadStreamAsync(long maxAllowedSize = 512000, long maxBufferSize = 102400, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +Microsoft.JSInterop.IJSStreamReference.OpenReadStreamAsync(long maxAllowedSize = 512000, long pauseIncomingBytesThreshold = -1, long resumeIncomingBytesThreshold = -1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask Microsoft.JSInterop.Implementation.JSStreamReference Microsoft.JSInterop.Implementation.JSStreamReference.Length.get -> long Microsoft.JSInterop.Implementation.JSObjectReferenceJsonWorker @@ -36,7 +36,7 @@ static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JS static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, System.TimeSpan timeout, params object?[]? args) -> System.Threading.Tasks.ValueTask *REMOVED*static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, params object![]! args) -> System.Threading.Tasks.ValueTask static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, params object?[]? args) -> System.Threading.Tasks.ValueTask -virtual Microsoft.JSInterop.JSRuntime.ReadJSDataAsStreamAsync(Microsoft.JSInterop.IJSStreamReference! jsStreamReference, long totalLength, long maxBufferSize, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task! +virtual Microsoft.JSInterop.JSRuntime.ReadJSDataAsStreamAsync(Microsoft.JSInterop.IJSStreamReference! jsStreamReference, long totalLength, long pauseIncomingBytesThreshold, long resumeIncomingBytesThreshold, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task! virtual Microsoft.JSInterop.JSRuntime.ReceiveByteArray(int id, byte[]! data) -> void virtual Microsoft.JSInterop.JSRuntime.SendByteArray(int id, byte[]! data) -> void Microsoft.JSInterop.JSDisconnectedException diff --git a/src/JSInterop/Microsoft.JSInterop/test/JSRuntimeTest.cs b/src/JSInterop/Microsoft.JSInterop/test/JSRuntimeTest.cs index f887055564ac..8e27c40fe5a1 100644 --- a/src/JSInterop/Microsoft.JSInterop/test/JSRuntimeTest.cs +++ b/src/JSInterop/Microsoft.JSInterop/test/JSRuntimeTest.cs @@ -404,7 +404,7 @@ public async void ReadJSDataAsStreamAsync_ThrowsNotSupportedException() var dataReference = new JSStreamReference(runtime, 10, 10); // Act - var exception = await Assert.ThrowsAsync(async () => await runtime.ReadJSDataAsStreamAsync(dataReference, 10, 10, CancellationToken.None)); + var exception = await Assert.ThrowsAsync(async () => await runtime.ReadJSDataAsStreamAsync(dataReference, 10, 10, 10, CancellationToken.None)); // Assert Assert.Equal("The current JavaScript runtime does not support reading data streams.", exception.Message); From 1ea5be4226890f2a08d0f49fdac06ce08cd2be01 Mon Sep 17 00:00:00 2001 From: Tanay Parikh Date: Mon, 28 Jun 2021 22:56:52 -0700 Subject: [PATCH 2/6] Streaming CI Debugging (#33917) * CI Debugging * CiData message * CiData message * Update RemoteJSDataStream.cs * Remove Task.Delay * Update RemoteJSDataStream.cs * Update RemoteJSDataStream.cs --- .../Server/src/Circuits/RemoteJSDataStream.cs | 12 ++- .../test/Circuits/RemoteJSDataStreamTest.cs | 94 +++++++++++++++++++ 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs index 2180cdd77ea9..5dbec0acfe90 100644 --- a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs +++ b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs @@ -207,12 +207,22 @@ private async Task ThrowOnTimeout() } } - internal async Task CompletePipeAndDisposeStream(Exception? ex = null) + private async Task CompletePipeAndDisposeStream(Exception? ex = null) { await _pipe.Writer.CompleteAsync(ex); Dispose(true); } + /// + /// For testing purposes only. + /// + /// Triggers the timeout on the next check. + /// + internal void InvalidateLastDataReceivedTimeForTimeout() + { + _lastDataReceivedTime = _lastDataReceivedTime.Subtract(_jsInteropDefaultCallTimeout); + } + protected override void Dispose(bool disposing) { if (disposing) diff --git a/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs b/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs index e3e43a593489..aabbcca50a4f 100644 --- a/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs +++ b/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs @@ -155,6 +155,100 @@ public async void ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDiscon Assert.Equal("Out of sequence chunk received, expected 5, but received 7.", ex.Message); } + [Fact] + public async void ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed() + { + // Arrange + var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); + var timeoutExceptionRaisedSemaphore = new SemaphoreSlim(initialCount: 0, maxCount: 1); + jsRuntime.UnhandledException += (_, ex) => + { + Assert.Equal("Did not receive any data in the alloted time.", ex.Message); + Assert.IsType(ex); + timeoutExceptionRaisedSemaphore.Release(); + }; + + var jsStreamReference = Mock.Of(); + var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync( + jsRuntime, + jsStreamReference, + totalLength: 15, + maximumIncomingBytes: 10_000, + jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(10), // Note we're using a 10 second timeout for this test + pauseIncomingBytesThreshold: 50, + resumeIncomingBytesThreshold: 25, + cancellationToken: CancellationToken.None); + var streamId = GetStreamId(remoteJSDataStream, jsRuntime); + var chunk = new byte[] { 3, 5, 7 }; + + // Act & Assert 1 + // Trigger timeout and ensure unhandled exception raised to crush circuit + remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout(); + await timeoutExceptionRaisedSemaphore.WaitAsync(); + + // Act & Assert 2 + // Confirm exception also raised on pipe reader + using var mem = new MemoryStream(); + var ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem)); + Assert.Equal("Did not receive any data in the alloted time.", ex.Message); + + // Act & Assert 3 + // Ensures stream is disposed after the timeout and any additional chunks aren't accepted + var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null); + Assert.False(success); + } + + [Fact] + public async void ReceiveData_ReceivesDataThenTimesout_StreamDisposed() + { + // Arrange + var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); + var timeoutExceptionRaisedSemaphore = new SemaphoreSlim(initialCount: 0, maxCount: 1); + jsRuntime.UnhandledException += (_, ex) => + { + Assert.Equal("Did not receive any data in the alloted time.", ex.Message); + Assert.IsType(ex); + timeoutExceptionRaisedSemaphore.Release(); + }; + + var jsStreamReference = Mock.Of(); + var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync( + jsRuntime, + jsStreamReference, + totalLength: 15, + maximumIncomingBytes: 10_000, + jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(30), // Note we're using a 30 second timeout for this test + pauseIncomingBytesThreshold: 50, + resumeIncomingBytesThreshold: 25, + cancellationToken: CancellationToken.None); + var streamId = GetStreamId(remoteJSDataStream, jsRuntime); + var chunk = new byte[] { 3, 5, 7 }; + + // Act & Assert 1 + var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null); + Assert.True(success); + + // Act & Assert 2 + success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 1, chunk, error: null); + Assert.True(success); + + // Act & Assert 3 + // Trigger timeout and ensure unhandled exception raised to crush circuit + remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout(); + await timeoutExceptionRaisedSemaphore.WaitAsync(); + + // Act & Assert 4 + // Confirm exception also raised on pipe reader + using var mem = new MemoryStream(); + var ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem)); + Assert.Equal("Did not receive any data in the alloted time.", ex.Message); + + // Act & Assert 5 + // Ensures stream is disposed after the timeout and any additional chunks aren't accepted + success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 2, chunk, error: null); + Assert.False(success); + } + private static async Task CreateRemoteJSDataStreamAsync(TestRemoteJSRuntime jsRuntime = null) { var jsStreamReference = Mock.Of(); From 94ea80189cc5a0c712312090bde9257643e81dac Mon Sep 17 00:00:00 2001 From: Tanay Parikh Date: Tue, 29 Jun 2021 11:39:35 -0700 Subject: [PATCH 3/6] PR Feedback (Including PipeReader Property) --- .../Server/src/Circuits/RemoteJSDataStream.cs | 8 ++- .../test/Circuits/RemoteJSDataStreamTest.cs | 50 +++++++++++++++---- 2 files changed, 46 insertions(+), 12 deletions(-) diff --git a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs index 5dbec0acfe90..b3a347066750 100644 --- a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs +++ b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs @@ -82,8 +82,14 @@ private RemoteJSDataStream( _pipe = new Pipe(new PipeOptions(pauseWriterThreshold: pauseIncomingBytesThreshold, resumeWriterThreshold: resumeIncomingBytesThreshold)); _pipeReaderStream = _pipe.Reader.AsStream(); + PipeReader = _pipe.Reader; } + /// + /// Gets a to directly read data sent by the JavaScript client. + /// + public PipeReader PipeReader { get; } + private async Task ReceiveData(long chunkId, byte[] chunk, string error) { try @@ -201,7 +207,7 @@ private async Task ThrowOnTimeout() if (!_disposed && (DateTimeOffset.UtcNow >= _lastDataReceivedTime.Add(_jsInteropDefaultCallTimeout))) { // Dispose of the stream if a chunk isn't received within the jsInteropDefaultCallTimeout. - var timeoutException = new TimeoutException("Did not receive any data in the alloted time."); + var timeoutException = new TimeoutException("Did not receive any data in the allotted time."); await CompletePipeAndDisposeStream(timeoutException); _runtime.RaiseUnhandledException(timeoutException); } diff --git a/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs b/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs index aabbcca50a4f..3b1ebb8bfa78 100644 --- a/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs +++ b/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs @@ -20,7 +20,7 @@ public class RemoteJSDataStreamTest private static readonly TestRemoteJSRuntime _jsRuntime = new(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); [Fact] - public async void CreateRemoteJSDataStreamAsync_CreatesStream() + public async Task CreateRemoteJSDataStreamAsync_CreatesStream() { // Arrange var jsStreamReference = Mock.Of(); @@ -33,7 +33,7 @@ public async void CreateRemoteJSDataStreamAsync_CreatesStream() } [Fact] - public async void ReceiveData_DoesNotFindStream() + public async Task ReceiveData_DoesNotFindStream() { // Arrange var chunk = new byte[] { 3, 5, 6, 7 }; @@ -47,7 +47,7 @@ public async void ReceiveData_DoesNotFindStream() } [Fact] - public async void ReceiveData_SuccessReadsBackStream() + public async Task ReceiveData_SuccessReadsBackStream() { // Arrange var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); @@ -75,7 +75,35 @@ public async void ReceiveData_SuccessReadsBackStream() } [Fact] - public async void ReceiveData_WithError() + public async Task ReceiveData_SuccessReadsBackPipeReader() + { + // Arrange + var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); + var remoteJSDataStream = await CreateRemoteJSDataStreamAsync(jsRuntime); + var streamId = GetStreamId(remoteJSDataStream, jsRuntime); + var chunk = new byte[100]; + var random = new Random(); + random.NextBytes(chunk); + + var sendDataTask = Task.Run(async () => + { + // Act 1 + var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null); + return success; + }); + + // Act & Assert 2 + using var memoryStream = new MemoryStream(); + await remoteJSDataStream.PipeReader.CopyToAsync(memoryStream); + Assert.Equal(chunk, memoryStream.ToArray()); + + // Act & Assert 3 + var sendDataCompleted = await sendDataTask; + Assert.True(sendDataCompleted); + } + + [Fact] + public async Task ReceiveData_WithError() { // Arrange var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); @@ -93,7 +121,7 @@ public async void ReceiveData_WithError() } [Fact] - public async void ReceiveData_WithZeroLengthChunk() + public async Task ReceiveData_WithZeroLengthChunk() { // Arrange var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); @@ -112,7 +140,7 @@ public async void ReceiveData_WithZeroLengthChunk() } [Fact] - public async void ReceiveData_ProvidedWithMoreBytesThanRemaining() + public async Task ReceiveData_ProvidedWithMoreBytesThanRemaining() { // Arrange var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); @@ -132,7 +160,7 @@ public async void ReceiveData_ProvidedWithMoreBytesThanRemaining() } [Fact] - public async void ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDisconnect() + public async Task ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDisconnect() { // Arrange var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); @@ -156,7 +184,7 @@ public async void ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDiscon } [Fact] - public async void ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed() + public async Task ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed() { // Arrange var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); @@ -190,7 +218,7 @@ public async void ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed() // Confirm exception also raised on pipe reader using var mem = new MemoryStream(); var ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem)); - Assert.Equal("Did not receive any data in the alloted time.", ex.Message); + Assert.Equal("Did not receive any data in the allotted time.", ex.Message); // Act & Assert 3 // Ensures stream is disposed after the timeout and any additional chunks aren't accepted @@ -199,14 +227,14 @@ public async void ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed() } [Fact] - public async void ReceiveData_ReceivesDataThenTimesout_StreamDisposed() + public async Task ReceiveData_ReceivesDataThenTimesout_StreamDisposed() { // Arrange var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); var timeoutExceptionRaisedSemaphore = new SemaphoreSlim(initialCount: 0, maxCount: 1); jsRuntime.UnhandledException += (_, ex) => { - Assert.Equal("Did not receive any data in the alloted time.", ex.Message); + Assert.Equal("Did not receive any data in the allotted time.", ex.Message); Assert.IsType(ex); timeoutExceptionRaisedSemaphore.Release(); }; From 77857519453297f53f5fb77d5315d01295a57d6a Mon Sep 17 00:00:00 2001 From: Tanay Parikh Date: Tue, 29 Jun 2021 12:27:30 -0700 Subject: [PATCH 4/6] Improve documentation on safe defaults --- .../Server/src/Circuits/RemoteJSDataStream.cs | 6 +++--- .../Server/src/Circuits/RemoteJSRuntime.cs | 2 +- .../Microsoft.JSInterop/src/IJSStreamReference.cs | 13 +++++++++++-- .../Microsoft.JSInterop/src/JSRuntime.cs | 15 ++++++++++++--- .../src/PublicAPI.Unshipped.txt | 2 +- 5 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs index b3a347066750..f5bc58509518 100644 --- a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs +++ b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs @@ -42,9 +42,9 @@ public static async ValueTask CreateRemoteJSDataStreamAsync( long totalLength, long maximumIncomingBytes, TimeSpan jsInteropDefaultCallTimeout, - long pauseIncomingBytesThreshold, - long resumeIncomingBytesThreshold, - CancellationToken cancellationToken) + long pauseIncomingBytesThreshold = -1, + long resumeIncomingBytesThreshold = -1, + CancellationToken cancellationToken = default) { // Enforce minimum 1 kb, maximum 50 kb, SignalR message size. // We budget 512 bytes overhead for the transfer, thus leaving at least 512 bytes for data diff --git a/src/Components/Server/src/Circuits/RemoteJSRuntime.cs b/src/Components/Server/src/Circuits/RemoteJSRuntime.cs index 0ceda9b752e8..f51fefb27ff1 100644 --- a/src/Components/Server/src/Circuits/RemoteJSRuntime.cs +++ b/src/Components/Server/src/Circuits/RemoteJSRuntime.cs @@ -157,7 +157,7 @@ public void MarkPermanentlyDisconnected() _clientProxy = null; } - protected override async Task ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long pauseIncomingBytesThreshold, long resumeIncomingBytesThreshold, CancellationToken cancellationToken) + protected override async Task ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long pauseIncomingBytesThreshold = -1, long resumeIncomingBytesThreshold = -1, CancellationToken cancellationToken = default) => await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(this, jsStreamReference, totalLength, _maximumIncomingBytes, _options.JSInteropDefaultCallTimeout, pauseIncomingBytesThreshold, resumeIncomingBytesThreshold, cancellationToken); public static class Log diff --git a/src/JSInterop/Microsoft.JSInterop/src/IJSStreamReference.cs b/src/JSInterop/Microsoft.JSInterop/src/IJSStreamReference.cs index 5fa34ca4aeb6..465bb6ddb08c 100644 --- a/src/JSInterop/Microsoft.JSInterop/src/IJSStreamReference.cs +++ b/src/JSInterop/Microsoft.JSInterop/src/IJSStreamReference.cs @@ -22,8 +22,17 @@ public interface IJSStreamReference : IAsyncDisposable /// Opens a with the for the current data reference. /// /// Maximum number of bytes permitted to be read from JavaScript. - /// The number of unconsumed bytes to accept from JS before blocking. A value of zero prevents JS from blocking, allowing .NET to receive an unlimited number of bytes. - /// The number of unflushed bytes at which point JS stops blocking. + /// + /// The number of unconsumed bytes to accept from JS before blocking. + /// Defaults to -1, which indicates use of the default . + /// Avoid specifying an excessively large value because this could allow clients to exhaust memory. + /// A value of zero prevents JS from blocking, allowing .NET to receive an unlimited number of bytes. + /// + /// + /// The number of unflushed bytes at which point JS stops blocking. + /// Defaults to -1, which indicates use of the default . + /// Must be less than the to prevent thrashing at the limit. + /// /// for cancelling read. /// which can provide data associated with the current data reference. ValueTask OpenReadStreamAsync(long maxAllowedSize = 512000, long pauseIncomingBytesThreshold = -1, long resumeIncomingBytesThreshold = -1, CancellationToken cancellationToken = default); diff --git a/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs b/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs index a3c8fbc99bb1..840dd8d702e1 100644 --- a/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs +++ b/src/JSInterop/Microsoft.JSInterop/src/JSRuntime.cs @@ -215,11 +215,20 @@ protected internal virtual void ReceiveByteArray(int id, byte[] data) /// /// to produce a data stream for. /// Expected length of the incoming data stream. - /// The number of unconsumed bytes to accept from JS before blocking. A value of zero prevents JS from blocking, allowing .NET to receive an unlimited number of bytes. - /// The number of unflushed bytes at which point JS stops blocking. + /// + /// The number of unconsumed bytes to accept from JS before blocking. + /// Defaults to -1, which indicates use of the default . + /// Avoid specifying an excessively large value because this could allow clients to exhaust memory. + /// A value of zero prevents JS from blocking, allowing .NET to receive an unlimited number of bytes. + /// + /// + /// The number of unflushed bytes at which point JS stops blocking. + /// Defaults to -1, which indicates use of the default . + /// Must be less than the to prevent thrashing at the limit. + /// /// for cancelling read. /// for the data reference represented by . - protected internal virtual Task ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long pauseIncomingBytesThreshold, long resumeIncomingBytesThreshold, CancellationToken cancellationToken) + protected internal virtual Task ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long pauseIncomingBytesThreshold = -1, long resumeIncomingBytesThreshold = -1, CancellationToken cancellationToken = default) { // The reason it's virtual and not abstract is just for back-compat diff --git a/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt b/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt index 3340aecfb9cb..5eaf7afec314 100644 --- a/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt +++ b/src/JSInterop/Microsoft.JSInterop/src/PublicAPI.Unshipped.txt @@ -36,7 +36,7 @@ static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JS static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, System.TimeSpan timeout, params object?[]? args) -> System.Threading.Tasks.ValueTask *REMOVED*static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, params object![]! args) -> System.Threading.Tasks.ValueTask static Microsoft.JSInterop.JSRuntimeExtensions.InvokeVoidAsync(this Microsoft.JSInterop.IJSRuntime! jsRuntime, string! identifier, params object?[]? args) -> System.Threading.Tasks.ValueTask -virtual Microsoft.JSInterop.JSRuntime.ReadJSDataAsStreamAsync(Microsoft.JSInterop.IJSStreamReference! jsStreamReference, long totalLength, long pauseIncomingBytesThreshold, long resumeIncomingBytesThreshold, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task! +virtual Microsoft.JSInterop.JSRuntime.ReadJSDataAsStreamAsync(Microsoft.JSInterop.IJSStreamReference! jsStreamReference, long totalLength, long pauseIncomingBytesThreshold = -1, long resumeIncomingBytesThreshold = -1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! virtual Microsoft.JSInterop.JSRuntime.ReceiveByteArray(int id, byte[]! data) -> void virtual Microsoft.JSInterop.JSRuntime.SendByteArray(int id, byte[]! data) -> void Microsoft.JSInterop.JSDisconnectedException From 1cd162f6eed4f7249081ad9f877ce83456dd35c3 Mon Sep 17 00:00:00 2001 From: Tanay Parikh Date: Tue, 29 Jun 2021 13:18:16 -0700 Subject: [PATCH 5/6] Task Completion Source & DefaultTimeout --- .../Server/src/Circuits/RemoteJSDataStream.cs | 10 --- .../test/Circuits/RemoteJSDataStreamTest.cs | 75 +++++++++---------- 2 files changed, 37 insertions(+), 48 deletions(-) diff --git a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs index f5bc58509518..fb66d5d6fcef 100644 --- a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs +++ b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs @@ -219,16 +219,6 @@ private async Task CompletePipeAndDisposeStream(Exception? ex = null) Dispose(true); } - /// - /// For testing purposes only. - /// - /// Triggers the timeout on the next check. - /// - internal void InvalidateLastDataReceivedTimeForTimeout() - { - _lastDataReceivedTime = _lastDataReceivedTime.Subtract(_jsInteropDefaultCallTimeout); - } - protected override void Dispose(bool disposing) { if (disposing) diff --git a/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs b/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs index 3b1ebb8bfa78..e266f4fc8c74 100644 --- a/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs +++ b/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; +using Microsoft.AspNetCore.Testing; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Microsoft.JSInterop; @@ -26,7 +27,7 @@ public async Task CreateRemoteJSDataStreamAsync_CreatesStream() var jsStreamReference = Mock.Of(); // Act - var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(_jsRuntime, jsStreamReference, totalLength: 100, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None); + var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(_jsRuntime, jsStreamReference, totalLength: 100, maximumIncomingBytes: 10_000, jsInteropDefaultCallTimeout: TimeSpan.FromMinutes(1), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None).DefaultTimeout(); // Assert Assert.NotNull(remoteJSDataStream); @@ -40,7 +41,7 @@ public async Task ReceiveData_DoesNotFindStream() var unrecognizedGuid = 10; // Act - var success = await RemoteJSDataStream.ReceiveData(_jsRuntime, streamId: unrecognizedGuid, chunkId: 0, chunk, error: null); + var success = await RemoteJSDataStream.ReceiveData(_jsRuntime, streamId: unrecognizedGuid, chunkId: 0, chunk, error: null).DefaultTimeout(); // Assert Assert.False(success); @@ -60,17 +61,17 @@ public async Task ReceiveData_SuccessReadsBackStream() var sendDataTask = Task.Run(async () => { // Act 1 - var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null); + var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout(); return success; }); // Act & Assert 2 using var memoryStream = new MemoryStream(); - await remoteJSDataStream.CopyToAsync(memoryStream); + await remoteJSDataStream.CopyToAsync(memoryStream).DefaultTimeout(); Assert.Equal(chunk, memoryStream.ToArray()); // Act & Assert 3 - var sendDataCompleted = await sendDataTask; + var sendDataCompleted = await sendDataTask.DefaultTimeout(); Assert.True(sendDataCompleted); } @@ -88,17 +89,17 @@ public async Task ReceiveData_SuccessReadsBackPipeReader() var sendDataTask = Task.Run(async () => { // Act 1 - var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null); + var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout(); return success; }); // Act & Assert 2 using var memoryStream = new MemoryStream(); - await remoteJSDataStream.PipeReader.CopyToAsync(memoryStream); + await remoteJSDataStream.PipeReader.CopyToAsync(memoryStream).DefaultTimeout(); Assert.Equal(chunk, memoryStream.ToArray()); // Act & Assert 3 - var sendDataCompleted = await sendDataTask; + var sendDataCompleted = await sendDataTask.DefaultTimeout(); Assert.True(sendDataCompleted); } @@ -111,12 +112,12 @@ public async Task ReceiveData_WithError() var streamId = GetStreamId(remoteJSDataStream, jsRuntime); // Act & Assert 1 - var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk: null, error: "some error"); + var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk: null, error: "some error").DefaultTimeout(); Assert.False(success); // Act & Assert 2 using var mem = new MemoryStream(); - var ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem)); + var ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout()); Assert.Equal("An error occurred while reading the remote stream: some error", ex.Message); } @@ -130,12 +131,12 @@ public async Task ReceiveData_WithZeroLengthChunk() var chunk = Array.Empty(); // Act & Assert 1 - var ex = await Assert.ThrowsAsync(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null)); + var ex = await Assert.ThrowsAsync(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout()); Assert.Equal("The incoming data chunk cannot be empty.", ex.Message); // Act & Assert 2 using var mem = new MemoryStream(); - ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem)); + ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout()); Assert.Equal("The incoming data chunk cannot be empty.", ex.Message); } @@ -150,12 +151,12 @@ public async Task ReceiveData_ProvidedWithMoreBytesThanRemaining() var chunk = new byte[110]; // 100 byte totalLength for stream // Act & Assert 1 - var ex = await Assert.ThrowsAsync(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null)); + var ex = await Assert.ThrowsAsync(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout()); Assert.Equal("The incoming data stream declared a length 100, but 110 bytes were sent.", ex.Message); // Act & Assert 2 using var mem = new MemoryStream(); - ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem)); + ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout()); Assert.Equal("The incoming data stream declared a length 100, but 110 bytes were sent.", ex.Message); } @@ -174,12 +175,12 @@ public async Task ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDiscon { await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: i, chunk, error: null); } - var ex = await Assert.ThrowsAsync(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 7, chunk, error: null)); + var ex = await Assert.ThrowsAsync(async () => await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 7, chunk, error: null).DefaultTimeout()); Assert.Equal("Out of sequence chunk received, expected 5, but received 7.", ex.Message); // Act & Assert 2 using var mem = new MemoryStream(); - ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem)); + ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout()); Assert.Equal("Out of sequence chunk received, expected 5, but received 7.", ex.Message); } @@ -187,13 +188,12 @@ public async Task ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDiscon public async Task ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed() { // Arrange + var unhandledExceptionRaisedTask = new TaskCompletionSource(); var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); - var timeoutExceptionRaisedSemaphore = new SemaphoreSlim(initialCount: 0, maxCount: 1); jsRuntime.UnhandledException += (_, ex) => { - Assert.Equal("Did not receive any data in the alloted time.", ex.Message); - Assert.IsType(ex); - timeoutExceptionRaisedSemaphore.Release(); + Assert.Equal("Did not receive any data in the allotted time.", ex.Message); + unhandledExceptionRaisedTask.SetResult(ex is TimeoutException); }; var jsStreamReference = Mock.Of(); @@ -202,7 +202,7 @@ public async Task ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed() jsStreamReference, totalLength: 15, maximumIncomingBytes: 10_000, - jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(10), // Note we're using a 10 second timeout for this test + jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(1), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None); @@ -210,19 +210,19 @@ public async Task ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed() var chunk = new byte[] { 3, 5, 7 }; // Act & Assert 1 - // Trigger timeout and ensure unhandled exception raised to crush circuit - remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout(); - await timeoutExceptionRaisedSemaphore.WaitAsync(); + // Ensure unhandled exception raised to crush circuit + var unhandledExceptionResult = await unhandledExceptionRaisedTask.Task.DefaultTimeout(); + Assert.True(unhandledExceptionResult); // Act & Assert 2 // Confirm exception also raised on pipe reader using var mem = new MemoryStream(); - var ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem)); + var ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout()); Assert.Equal("Did not receive any data in the allotted time.", ex.Message); // Act & Assert 3 // Ensures stream is disposed after the timeout and any additional chunks aren't accepted - var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null); + var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout(); Assert.False(success); } @@ -230,13 +230,12 @@ public async Task ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed() public async Task ReceiveData_ReceivesDataThenTimesout_StreamDisposed() { // Arrange + var unhandledExceptionRaisedTask = new TaskCompletionSource(); var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of>()); - var timeoutExceptionRaisedSemaphore = new SemaphoreSlim(initialCount: 0, maxCount: 1); jsRuntime.UnhandledException += (_, ex) => { Assert.Equal("Did not receive any data in the allotted time.", ex.Message); - Assert.IsType(ex); - timeoutExceptionRaisedSemaphore.Release(); + unhandledExceptionRaisedTask.SetResult(ex is TimeoutException); }; var jsStreamReference = Mock.Of(); @@ -245,7 +244,7 @@ public async Task ReceiveData_ReceivesDataThenTimesout_StreamDisposed() jsStreamReference, totalLength: 15, maximumIncomingBytes: 10_000, - jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(30), // Note we're using a 30 second timeout for this test + jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(3), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None); @@ -253,27 +252,27 @@ public async Task ReceiveData_ReceivesDataThenTimesout_StreamDisposed() var chunk = new byte[] { 3, 5, 7 }; // Act & Assert 1 - var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null); + var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null).DefaultTimeout(); Assert.True(success); // Act & Assert 2 - success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 1, chunk, error: null); + success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 1, chunk, error: null).DefaultTimeout(); Assert.True(success); // Act & Assert 3 - // Trigger timeout and ensure unhandled exception raised to crush circuit - remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout(); - await timeoutExceptionRaisedSemaphore.WaitAsync(); + // Ensure unhandled exception raised to crush circuit + var unhandledExceptionResult = await unhandledExceptionRaisedTask.Task.DefaultTimeout(); + Assert.True(unhandledExceptionResult); // Act & Assert 4 // Confirm exception also raised on pipe reader using var mem = new MemoryStream(); - var ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem)); - Assert.Equal("Did not receive any data in the alloted time.", ex.Message); + var ex = await Assert.ThrowsAsync(async () => await remoteJSDataStream.CopyToAsync(mem).DefaultTimeout()); + Assert.Equal("Did not receive any data in the allotted time.", ex.Message); // Act & Assert 5 // Ensures stream is disposed after the timeout and any additional chunks aren't accepted - success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 2, chunk, error: null); + success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 2, chunk, error: null).DefaultTimeout(); Assert.False(success); } From 7961c34a2bccedc53a733f091a528b2dc9541b4b Mon Sep 17 00:00:00 2001 From: Tanay Parikh Date: Tue, 29 Jun 2021 14:57:14 -0700 Subject: [PATCH 6/6] InvalidateLastDataReceivedTimeForTimeout --- .../Server/src/Circuits/RemoteJSDataStream.cs | 10 ++++++++++ .../Server/test/Circuits/RemoteJSDataStreamTest.cs | 8 +++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs index fb66d5d6fcef..7c3f27853f43 100644 --- a/src/Components/Server/src/Circuits/RemoteJSDataStream.cs +++ b/src/Components/Server/src/Circuits/RemoteJSDataStream.cs @@ -213,6 +213,16 @@ private async Task ThrowOnTimeout() } } + /// + /// For testing purposes only. + /// + /// Triggers the timeout on the next check. + /// + internal void InvalidateLastDataReceivedTimeForTimeout() + { + _lastDataReceivedTime = _lastDataReceivedTime.Subtract(_jsInteropDefaultCallTimeout); + } + private async Task CompletePipeAndDisposeStream(Exception? ex = null) { await _pipe.Writer.CompleteAsync(ex); diff --git a/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs b/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs index e266f4fc8c74..85cf6bd28957 100644 --- a/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs +++ b/src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs @@ -202,7 +202,7 @@ public async Task ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed() jsStreamReference, totalLength: 15, maximumIncomingBytes: 10_000, - jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(1), + jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(2), pauseIncomingBytesThreshold: 50, resumeIncomingBytesThreshold: 25, cancellationToken: CancellationToken.None); @@ -210,7 +210,8 @@ public async Task ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed() var chunk = new byte[] { 3, 5, 7 }; // Act & Assert 1 - // Ensure unhandled exception raised to crush circuit + // Trigger timeout and ensure unhandled exception raised to crush circuit + remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout(); var unhandledExceptionResult = await unhandledExceptionRaisedTask.Task.DefaultTimeout(); Assert.True(unhandledExceptionResult); @@ -260,7 +261,8 @@ public async Task ReceiveData_ReceivesDataThenTimesout_StreamDisposed() Assert.True(success); // Act & Assert 3 - // Ensure unhandled exception raised to crush circuit + // Trigger timeout and ensure unhandled exception raised to crush circuit + remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout(); var unhandledExceptionResult = await unhandledExceptionRaisedTask.Task.DefaultTimeout(); Assert.True(unhandledExceptionResult);