From 864023a458ef7a775c115d3305c4304685fc0cf2 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Thu, 19 Jul 2018 16:24:14 -0700 Subject: [PATCH 1/3] Fix ReadAsync not returning when cancellation token fires during FlushAsync --- .../src/System/IO/Pipelines/Pipe.cs | 26 +++---- .../tests/FlushAsyncCancellationTests.cs | 75 +++++++++++++++++++ 2 files changed, 88 insertions(+), 13 deletions(-) diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 309cb12abc32..4acf4ce17ec9 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -336,6 +336,19 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) // AttachToken before completing reader awaiter in case cancellationToken is already completed cancellationTokenRegistration = _writerAwaitable.AttachToken(cancellationToken, s_signalWriterAwaitable, this); + // If the writer is completed (which it will be most of the time) the return a completed ValueTask + if (_writerAwaitable.IsCompleted) + { + var flushResult = new FlushResult(); + GetFlushResult(ref flushResult); + result = new ValueTask(flushResult); + } + else + { + // Otherwise it's async + result = new ValueTask(_writer, token: 0); + } + // Complete reader only if new data was pushed into the pipe if (!wasEmpty) { @@ -349,19 +362,6 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) // I couldn't find a way for flush to induce backpressure deadlock // if it always adds new data to pipe and wakes up the reader but assert anyway Debug.Assert(_writerAwaitable.IsCompleted || _readerAwaitable.IsCompleted); - - // If the writer is completed (which it will be most of the time) the return a completed ValueTask - if (_writerAwaitable.IsCompleted) - { - var flushResult = new FlushResult(); - GetFlushResult(ref flushResult); - result = new ValueTask(flushResult); - } - else - { - // Otherwise it's async - result = new ValueTask(_writer, token: 0); - } } cancellationTokenRegistration.Dispose(); diff --git a/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs b/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs index 8640404276ee..c12858c83925 100644 --- a/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs +++ b/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs @@ -325,6 +325,81 @@ public async Task FlushAsyncThrowsIfPassedCanceledCancellationTokenAndPipeIsAble Assert.True(task.IsCompleted); Assert.True(task.Result.IsCompleted); } + + [Fact] + public void ReadAsyncCompletesIfFlushAsyncCanceledMidFlush() + { + // This test tries to get pipe into a state where ReadAsync is being awaited + // and FlushAsync is cancelled while the method is running + Pipe = new Pipe(); + var autoResetEvent = new ManualResetEvent(false); + + var cancellationTokenSource = new CancellationTokenSource(); + var writer = Task.Run(async () => + { + var cancellations = 0; + while (cancellations < 20) + { + try + { + // We want reader to be awaiting + autoResetEvent.WaitOne(); + Pipe.Writer.WriteEmpty(1); + + // We want the token to be cancelled during FlushAsync call + // check it it's already cancelled and try a new one + if (cancellationTokenSource.Token.IsCancellationRequested) + { + cancellationTokenSource = new CancellationTokenSource(); + continue; + } + + await Pipe.Writer.FlushAsync(cancellationTokenSource.Token); + autoResetEvent.Reset(); + } + catch (OperationCanceledException) + { + cancellations ++; + } + } + + Pipe.Writer.Complete(); + return; + }); + + var reader = Task.Run(async () => + { + while (true) + { + var readTask = Pipe.Reader.ReadAsync(); + + // Signal writer to initiate a flush + if (!readTask.IsCompleted) + { + autoResetEvent.Set(); + } + + var result = await readTask; + + if (result.Buffer.IsEmpty) + { + return; + } + + Pipe.Reader.AdvanceTo(result.Buffer.End); + } + }); + + var canceller = Task.Run(() => + { + while (!writer.IsCompleted) + { + cancellationTokenSource.Cancel(); + } + }); + + Assert.True(Task.WaitAll(new [] { writer, reader, canceller }, TimeSpan.FromSeconds(30)), "Reader was not completed in reasonable time"); + } } public static class TestWriterExtensions From 19b65088fe33dd073a8173a6bd8d2895c4b7143b Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Mon, 23 Jul 2018 17:14:54 -0700 Subject: [PATCH 2/3] Fix tests --- .../src/System/IO/Pipelines/PipeAwaitable.cs | 13 +++++++++++-- .../tests/FlushAsyncCancellationTests.cs | 8 ++++---- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs index 8bb16cfcccd9..c3c5358135b6 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs @@ -60,14 +60,23 @@ public CancellationTokenRegistration AttachToken(CancellationToken cancellationT public void Complete(out CompletionData completionData) { Action currentCompletion = _completion; + object currentState = _completionState; + _completion = s_awaitableIsCompleted; + _completionState = null; completionData = default; - + if (!ReferenceEquals(currentCompletion, s_awaitableIsCompleted) && !ReferenceEquals(currentCompletion, s_awaitableIsNotCompleted)) { - completionData = new CompletionData(currentCompletion, _completionState, _executionContext, _synchronizationContext); + completionData = new CompletionData(currentCompletion, currentState, _executionContext, _synchronizationContext); + } + else if (_canceledState == CanceledState.CancellationRequested) + { + // Make sure we won't reset the awaitable in ObserveCancellation + // If Complete happens in between Cancel and ObserveCancellation + _canceledState = CanceledState.CancellationPreRequested; } } diff --git a/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs b/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs index c12858c83925..7894e13c7969 100644 --- a/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs +++ b/src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs @@ -332,7 +332,7 @@ public void ReadAsyncCompletesIfFlushAsyncCanceledMidFlush() // This test tries to get pipe into a state where ReadAsync is being awaited // and FlushAsync is cancelled while the method is running Pipe = new Pipe(); - var autoResetEvent = new ManualResetEvent(false); + var resetEvent = new ManualResetEvent(false); var cancellationTokenSource = new CancellationTokenSource(); var writer = Task.Run(async () => @@ -343,7 +343,7 @@ public void ReadAsyncCompletesIfFlushAsyncCanceledMidFlush() try { // We want reader to be awaiting - autoResetEvent.WaitOne(); + resetEvent.WaitOne(); Pipe.Writer.WriteEmpty(1); // We want the token to be cancelled during FlushAsync call @@ -355,7 +355,6 @@ public void ReadAsyncCompletesIfFlushAsyncCanceledMidFlush() } await Pipe.Writer.FlushAsync(cancellationTokenSource.Token); - autoResetEvent.Reset(); } catch (OperationCanceledException) { @@ -376,10 +375,11 @@ public void ReadAsyncCompletesIfFlushAsyncCanceledMidFlush() // Signal writer to initiate a flush if (!readTask.IsCompleted) { - autoResetEvent.Set(); + resetEvent.Set(); } var result = await readTask; + resetEvent.Reset(); if (result.Buffer.IsEmpty) { From 55a679923f30129e1cd22b5f8246c2aa48760001 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Tue, 24 Jul 2018 08:20:00 -0700 Subject: [PATCH 3/3] Typo --- src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 4acf4ce17ec9..a3a897ca3db9 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -336,7 +336,7 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) // AttachToken before completing reader awaiter in case cancellationToken is already completed cancellationTokenRegistration = _writerAwaitable.AttachToken(cancellationToken, s_signalWriterAwaitable, this); - // If the writer is completed (which it will be most of the time) the return a completed ValueTask + // If the writer is completed (which it will be most of the time) then return a completed ValueTask if (_writerAwaitable.IsCompleted) { var flushResult = new FlushResult();