diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 309cb12abc32..a3a897ca3db9 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -336,6 +336,19 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) // AttachToken before completing reader awaiter in case cancellationToken is already completed cancellationTokenRegistration = _writerAwaitable.AttachToken(cancellationToken, s_signalWriterAwaitable, this); + // If the writer is completed (which it will be most of the time) then return a completed ValueTask + if (_writerAwaitable.IsCompleted) + { + var flushResult = new FlushResult(); + GetFlushResult(ref flushResult); + result = new ValueTask(flushResult); + } + else + { + // Otherwise it's async + result = new ValueTask(_writer, token: 0); + } + // Complete reader only if new data was pushed into the pipe if (!wasEmpty) { @@ -349,19 +362,6 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) // I couldn't find a way for flush to induce backpressure deadlock // if it always adds new data to pipe and wakes up the reader but assert anyway Debug.Assert(_writerAwaitable.IsCompleted || _readerAwaitable.IsCompleted); - - // If the writer is completed (which it will be most of the time) the return a completed ValueTask - if (_writerAwaitable.IsCompleted) - { - var flushResult = new FlushResult(); - GetFlushResult(ref flushResult); - result = new ValueTask(flushResult); - } - else - { - // Otherwise it's async - result = new ValueTask(_writer, token: 0); - } } cancellationTokenRegistration.Dispose(); diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs index 8bb16cfcccd9..c3c5358135b6 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs @@ -60,14 +60,23 @@ public CancellationTokenRegistration AttachToken(CancellationToken cancellationT public void Complete(out CompletionData completionData) { Action currentCompletion = _completion; + object currentState = _completionState; + _completion = s_awaitableIsCompleted; + _completionState = null; completionData = default; - + if (!ReferenceEquals(currentCompletion, s_awaitableIsCompleted) && !ReferenceEquals(currentCompletion, s_awaitableIsNotCompleted)) { - completionData = new CompletionData(currentCompletion, _completionState, _executionContext, _synchronizationContext); + completionData = new CompletionData(currentCompletion, currentState, _executionContext, _synchronizationContext); + } + else if (_canceledState == CanceledState.CancellationRequested) + { + // Make sure we won't reset the awaitable in ObserveCancellation + // If Complete happens in between Cancel and ObserveCancellation + _canceledState = CanceledState.CancellationPreRequested; } } diff --git a/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs b/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs index 8640404276ee..7894e13c7969 100644 --- a/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs +++ b/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs @@ -325,6 +325,81 @@ public async Task FlushAsyncThrowsIfPassedCanceledCancellationTokenAndPipeIsAble Assert.True(task.IsCompleted); Assert.True(task.Result.IsCompleted); } + + [Fact] + public void ReadAsyncCompletesIfFlushAsyncCanceledMidFlush() + { + // This test tries to get pipe into a state where ReadAsync is being awaited + // and FlushAsync is cancelled while the method is running + Pipe = new Pipe(); + var resetEvent = new ManualResetEvent(false); + + var cancellationTokenSource = new CancellationTokenSource(); + var writer = Task.Run(async () => + { + var cancellations = 0; + while (cancellations < 20) + { + try + { + // We want reader to be awaiting + resetEvent.WaitOne(); + Pipe.Writer.WriteEmpty(1); + + // We want the token to be cancelled during FlushAsync call + // check it it's already cancelled and try a new one + if (cancellationTokenSource.Token.IsCancellationRequested) + { + cancellationTokenSource = new CancellationTokenSource(); + continue; + } + + await Pipe.Writer.FlushAsync(cancellationTokenSource.Token); + } + catch (OperationCanceledException) + { + cancellations ++; + } + } + + Pipe.Writer.Complete(); + return; + }); + + var reader = Task.Run(async () => + { + while (true) + { + var readTask = Pipe.Reader.ReadAsync(); + + // Signal writer to initiate a flush + if (!readTask.IsCompleted) + { + resetEvent.Set(); + } + + var result = await readTask; + resetEvent.Reset(); + + if (result.Buffer.IsEmpty) + { + return; + } + + Pipe.Reader.AdvanceTo(result.Buffer.End); + } + }); + + var canceller = Task.Run(() => + { + while (!writer.IsCompleted) + { + cancellationTokenSource.Cancel(); + } + }); + + Assert.True(Task.WaitAll(new [] { writer, reader, canceller }, TimeSpan.FromSeconds(30)), "Reader was not completed in reasonable time"); + } } public static class TestWriterExtensions