Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,19 @@ internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
// AttachToken before completing reader awaiter in case cancellationToken is already completed
cancellationTokenRegistration = _writerAwaitable.AttachToken(cancellationToken, s_signalWriterAwaitable, this);

// If the writer is completed (which it will be most of the time) then return a completed ValueTask
if (_writerAwaitable.IsCompleted)
{
var flushResult = new FlushResult();
GetFlushResult(ref flushResult);
result = new ValueTask<FlushResult>(flushResult);
}
else
{
// Otherwise it's async
result = new ValueTask<FlushResult>(_writer, token: 0);
}

// Complete reader only if new data was pushed into the pipe
if (!wasEmpty)
{
Expand All @@ -349,19 +362,6 @@ internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
// I couldn't find a way for flush to induce backpressure deadlock
// if it always adds new data to pipe and wakes up the reader but assert anyway
Debug.Assert(_writerAwaitable.IsCompleted || _readerAwaitable.IsCompleted);

// If the writer is completed (which it will be most of the time) the return a completed ValueTask
if (_writerAwaitable.IsCompleted)
{
var flushResult = new FlushResult();
GetFlushResult(ref flushResult);
result = new ValueTask<FlushResult>(flushResult);
}
else
{
// Otherwise it's async
result = new ValueTask<FlushResult>(_writer, token: 0);
}
}

cancellationTokenRegistration.Dispose();
Expand Down
13 changes: 11 additions & 2 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,23 @@ public CancellationTokenRegistration AttachToken(CancellationToken cancellationT
public void Complete(out CompletionData completionData)
{
Action<object> currentCompletion = _completion;
object currentState = _completionState;

_completion = s_awaitableIsCompleted;
_completionState = null;

completionData = default;

if (!ReferenceEquals(currentCompletion, s_awaitableIsCompleted) &&
!ReferenceEquals(currentCompletion, s_awaitableIsNotCompleted))
{
completionData = new CompletionData(currentCompletion, _completionState, _executionContext, _synchronizationContext);
completionData = new CompletionData(currentCompletion, currentState, _executionContext, _synchronizationContext);
}
else if (_canceledState == CanceledState.CancellationRequested)
{
// Make sure we won't reset the awaitable in ObserveCancellation
// If Complete happens in between Cancel and ObserveCancellation
_canceledState = CanceledState.CancellationPreRequested;
}
}

Expand Down
75 changes: 75 additions & 0 deletions src/System.IO.Pipelines/tests/FlushAsyncCancellationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,81 @@ public async Task FlushAsyncThrowsIfPassedCanceledCancellationTokenAndPipeIsAble
Assert.True(task.IsCompleted);
Assert.True(task.Result.IsCompleted);
}

[Fact]
public void ReadAsyncCompletesIfFlushAsyncCanceledMidFlush()
{
// This test tries to get pipe into a state where ReadAsync is being awaited
// and FlushAsync is cancelled while the method is running
Pipe = new Pipe();
var resetEvent = new ManualResetEvent(false);

var cancellationTokenSource = new CancellationTokenSource();
var writer = Task.Run(async () =>
{
var cancellations = 0;
while (cancellations < 20)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason for 20?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just a random number.

{
try
{
// We want reader to be awaiting
resetEvent.WaitOne();
Pipe.Writer.WriteEmpty(1);

// We want the token to be cancelled during FlushAsync call
// check it it's already cancelled and try a new one
if (cancellationTokenSource.Token.IsCancellationRequested)
{
cancellationTokenSource = new CancellationTokenSource();
continue;
}

await Pipe.Writer.FlushAsync(cancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
cancellations ++;
}
}

Pipe.Writer.Complete();
return;
});

var reader = Task.Run(async () =>
{
while (true)
{
var readTask = Pipe.Reader.ReadAsync();

// Signal writer to initiate a flush
if (!readTask.IsCompleted)
{
resetEvent.Set();
}

var result = await readTask;
resetEvent.Reset();

if (result.Buffer.IsEmpty)
{
return;
}

Pipe.Reader.AdvanceTo(result.Buffer.End);
}
});

var canceller = Task.Run(() =>
{
while (!writer.IsCompleted)
{
cancellationTokenSource.Cancel();
}
});

Assert.True(Task.WaitAll(new [] { writer, reader, canceller }, TimeSpan.FromSeconds(30)), "Reader was not completed in reasonable time");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't potentially spending 30 seconds on a single unit test too long? Does the test still work as intended if we change this to < 5 seconds?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if fails. I don't want to introduce the test that would be flaky on slower machines. In the successful case, it passes quickly.

}
}

public static class TestWriterExtensions
Expand Down