diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletion.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletion.cs index b9b582c6589c..22ddbf1e7118 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletion.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletion.cs @@ -13,25 +13,28 @@ namespace System.IO.Pipelines internal struct PipeCompletion { private static readonly ArrayPool s_completionCallbackPool = ArrayPool.Shared; - private static readonly Exception s_completedNoException = new Exception(); private const int InitialCallbacksSize = 1; - private Exception _exception; + private bool _isCompleted; + private ExceptionDispatchInfo _exceptionInfo; private PipeCompletionCallback[] _callbacks; private int _callbackCount; - public bool IsCompleted => _exception != null; + public bool IsCompleted => _isCompleted; - public bool IsFaulted => IsCompleted && _exception != s_completedNoException; + public bool IsFaulted => _exceptionInfo != null; public PipeCompletionCallbacks TryComplete(Exception exception = null) { - if (_exception == null) + if (!_isCompleted) { - // Set the exception object to the exception passed in or a sentinel value - _exception = exception ?? s_completedNoException; + _isCompleted = true; + if (exception != null) + { + _exceptionInfo = ExceptionDispatchInfo.Capture(exception); + } } return GetCallbacks(); } @@ -68,12 +71,12 @@ public PipeCompletionCallbacks AddCallback(Action callback, o [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsCompletedOrThrow() { - if (_exception == null) + if (!_isCompleted) { return false; } - if (_exception != s_completedNoException) + if (_exceptionInfo != null) { ThrowLatchedException(); } @@ -91,7 +94,7 @@ private PipeCompletionCallbacks GetCallbacks() var callbacks = new PipeCompletionCallbacks(s_completionCallbackPool, _callbackCount, - _exception == s_completedNoException ? null : _exception, + _exceptionInfo?.SourceException, _callbacks); _callbacks = null; @@ -103,13 +106,14 @@ public void Reset() { Debug.Assert(IsCompleted); Debug.Assert(_callbacks == null); - _exception = null; + _isCompleted = false; + _exceptionInfo = null; } [MethodImpl(MethodImplOptions.NoInlining)] private void ThrowLatchedException() { - ExceptionDispatchInfo.Capture(_exception).Throw(); + _exceptionInfo.Throw(); } public override string ToString() diff --git a/src/System.IO.Pipelines/tests/FlushAsyncCompletionTests.cs b/src/System.IO.Pipelines/tests/FlushAsyncCompletionTests.cs index 7579adc7073e..e64e2af867ce 100644 --- a/src/System.IO.Pipelines/tests/FlushAsyncCompletionTests.cs +++ b/src/System.IO.Pipelines/tests/FlushAsyncCompletionTests.cs @@ -33,5 +33,33 @@ async Task Await(ValueTask a) Assert.Equal(true, task2.IsFaulted); Assert.Equal("Concurrent reads or writes are not supported.", task2.Exception.InnerExceptions[0].Message); } + + [Fact] + public async Task CompletingWithExceptionDoesNotAffectState() + { + Pipe.Writer.Complete(); + Pipe.Writer.Complete(new Exception()); + + var result = await Pipe.Reader.ReadAsync(); + Assert.True(result.IsCompleted); + } + + [Fact] + public async Task CompletingWithExceptionDoesNotAffectFailedState() + { + Pipe.Writer.Complete(new InvalidOperationException()); + Pipe.Writer.Complete(new Exception()); + + await Assert.ThrowsAsync(async () => await Pipe.Reader.ReadAsync()); + } + + [Fact] + public async Task CompletingWithoutExceptionDoesNotAffectState() + { + Pipe.Writer.Complete(new InvalidOperationException()); + Pipe.Writer.Complete(); + + await Assert.ThrowsAsync(async () => await Pipe.Reader.ReadAsync()); + } } } diff --git a/src/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs b/src/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs index f25f08b2a22c..6f1117909b8d 100644 --- a/src/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs +++ b/src/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs @@ -9,6 +9,7 @@ using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Text; +using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -238,6 +239,8 @@ void ThrowTestException() invalidOperationException = await Assert.ThrowsAsync(async () => await _pipe.Writer.FlushAsync()); Assert.Equal("Reader exception", invalidOperationException.Message); Assert.Contains("ThrowTestException", invalidOperationException.StackTrace); + + Assert.Single(Regex.Matches(invalidOperationException.StackTrace, "Pipe.GetFlushResult")); } [Fact] @@ -304,6 +307,8 @@ void ThrowTestException() invalidOperationException = await Assert.ThrowsAsync(async () => await _pipe.Reader.ReadAsync()); Assert.Equal("Writer exception", invalidOperationException.Message); Assert.Contains("ThrowTestException", invalidOperationException.StackTrace); + + Assert.Single(Regex.Matches(invalidOperationException.StackTrace, "Pipe.GetReadResult")); } [Fact] diff --git a/src/System.IO.Pipelines/tests/ReadAsyncCompletionTests.cs b/src/System.IO.Pipelines/tests/ReadAsyncCompletionTests.cs index fc11059b0024..cd418a4292c1 100644 --- a/src/System.IO.Pipelines/tests/ReadAsyncCompletionTests.cs +++ b/src/System.IO.Pipelines/tests/ReadAsyncCompletionTests.cs @@ -30,5 +30,33 @@ async Task Await(ValueTask a) Assert.Equal(true, task2.IsFaulted); Assert.Equal("Concurrent reads or writes are not supported.", task2.Exception.InnerExceptions[0].Message); } + + [Fact] + public async Task CompletingWithExceptionDoesNotAffectState() + { + Pipe.Reader.Complete(); + Pipe.Reader.Complete(new Exception()); + + var result = await Pipe.Writer.FlushAsync(); + Assert.True(result.IsCompleted); + } + + [Fact] + public async Task CompletingWithExceptionDoesNotAffectFailedState() + { + Pipe.Reader.Complete(new InvalidOperationException()); + Pipe.Reader.Complete(new Exception()); + + await Assert.ThrowsAsync(async () => await Pipe.Writer.FlushAsync()); + } + + [Fact] + public async Task CompletingWithoutExceptionDoesNotAffectState() + { + Pipe.Reader.Complete(new InvalidOperationException()); + Pipe.Reader.Complete(); + + await Assert.ThrowsAsync(async () => await Pipe.Writer.FlushAsync()); + } } }