Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,28 @@ namespace System.IO.Pipelines
internal struct PipeCompletion
{
private static readonly ArrayPool<PipeCompletionCallback> s_completionCallbackPool = ArrayPool<PipeCompletionCallback>.Shared;
private static readonly Exception s_completedNoException = new Exception();

private const int InitialCallbacksSize = 1;

private Exception _exception;
private bool _isCompleted;
private ExceptionDispatchInfo _exceptionInfo;

private PipeCompletionCallback[] _callbacks;
private int _callbackCount;

public bool IsCompleted => _exception != null;
public bool IsCompleted => _isCompleted;

public bool IsFaulted => IsCompleted && _exception != s_completedNoException;
public bool IsFaulted => _exceptionInfo != null;

public PipeCompletionCallbacks TryComplete(Exception exception = null)
{
if (_exception == null)
if (!_isCompleted)
{
// Set the exception object to the exception passed in or a sentinel value
_exception = exception ?? s_completedNoException;
_isCompleted = true;
if (exception != null)
{
_exceptionInfo = ExceptionDispatchInfo.Capture(exception);
}
}
return GetCallbacks();
}
Expand Down Expand Up @@ -68,12 +71,12 @@ public PipeCompletionCallbacks AddCallback(Action<Exception, object> callback, o
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsCompletedOrThrow()
{
if (_exception == null)
if (!_isCompleted)
{
return false;
}

if (_exception != s_completedNoException)
if (_exceptionInfo != null)
{
ThrowLatchedException();
}
Expand All @@ -91,7 +94,7 @@ private PipeCompletionCallbacks GetCallbacks()

var callbacks = new PipeCompletionCallbacks(s_completionCallbackPool,
_callbackCount,
_exception == s_completedNoException ? null : _exception,
_exceptionInfo?.SourceException,
_callbacks);

_callbacks = null;
Expand All @@ -103,13 +106,14 @@ public void Reset()
{
Debug.Assert(IsCompleted);
Debug.Assert(_callbacks == null);
_exception = null;
_isCompleted = false;
_exceptionInfo = null;
}

[MethodImpl(MethodImplOptions.NoInlining)]
private void ThrowLatchedException()
{
ExceptionDispatchInfo.Capture(_exception).Throw();
_exceptionInfo.Throw();
}

public override string ToString()
Expand Down
28 changes: 28 additions & 0 deletions src/System.IO.Pipelines/tests/FlushAsyncCompletionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,33 @@ async Task Await(ValueTask<FlushResult> a)
Assert.Equal(true, task2.IsFaulted);
Assert.Equal("Concurrent reads or writes are not supported.", task2.Exception.InnerExceptions[0].Message);
}

[Fact]
public async Task CompletingWithExceptionDoesNotAffectState()
{
Pipe.Writer.Complete();
Pipe.Writer.Complete(new Exception());

var result = await Pipe.Reader.ReadAsync();
Assert.True(result.IsCompleted);
}

[Fact]
public async Task CompletingWithExceptionDoesNotAffectFailedState()
{
Pipe.Writer.Complete(new InvalidOperationException());
Pipe.Writer.Complete(new Exception());

await Assert.ThrowsAsync<InvalidOperationException>(async () => await Pipe.Reader.ReadAsync());
}

[Fact]
public async Task CompletingWithoutExceptionDoesNotAffectState()
{
Pipe.Writer.Complete(new InvalidOperationException());
Pipe.Writer.Complete();

await Assert.ThrowsAsync<InvalidOperationException>(async () => await Pipe.Reader.ReadAsync());
}
}
}
5 changes: 5 additions & 0 deletions src/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
Expand Down Expand Up @@ -238,6 +239,8 @@ void ThrowTestException()
invalidOperationException = await Assert.ThrowsAsync<InvalidOperationException>(async () => await _pipe.Writer.FlushAsync());
Assert.Equal("Reader exception", invalidOperationException.Message);
Assert.Contains("ThrowTestException", invalidOperationException.StackTrace);

Assert.Single(Regex.Matches(invalidOperationException.StackTrace, "Pipe.GetFlushResult"));
}

[Fact]
Expand Down Expand Up @@ -304,6 +307,8 @@ void ThrowTestException()
invalidOperationException = await Assert.ThrowsAsync<InvalidOperationException>(async () => await _pipe.Reader.ReadAsync());
Assert.Equal("Writer exception", invalidOperationException.Message);
Assert.Contains("ThrowTestException", invalidOperationException.StackTrace);

Assert.Single(Regex.Matches(invalidOperationException.StackTrace, "Pipe.GetReadResult"));
}

[Fact]
Expand Down
28 changes: 28 additions & 0 deletions src/System.IO.Pipelines/tests/ReadAsyncCompletionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,33 @@ async Task Await(ValueTask<ReadResult> a)
Assert.Equal(true, task2.IsFaulted);
Assert.Equal("Concurrent reads or writes are not supported.", task2.Exception.InnerExceptions[0].Message);
}

[Fact]
public async Task CompletingWithExceptionDoesNotAffectState()
{
Pipe.Reader.Complete();
Pipe.Reader.Complete(new Exception());

var result = await Pipe.Writer.FlushAsync();
Assert.True(result.IsCompleted);
}

[Fact]
public async Task CompletingWithExceptionDoesNotAffectFailedState()
{
Pipe.Reader.Complete(new InvalidOperationException());
Pipe.Reader.Complete(new Exception());

await Assert.ThrowsAsync<InvalidOperationException>(async () => await Pipe.Writer.FlushAsync());
}

[Fact]
public async Task CompletingWithoutExceptionDoesNotAffectState()
{
Pipe.Reader.Complete(new InvalidOperationException());
Pipe.Reader.Complete();

await Assert.ThrowsAsync<InvalidOperationException>(async () => await Pipe.Writer.FlushAsync());
}
}
}