Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ protected PipeWriter() { }
protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public static System.IO.Pipelines.PipeWriter Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeWriterOptions? writerOptions = null) { throw null; }
public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
public virtual System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> FlushAsync(bool isMoreData, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public abstract System.Memory<byte> GetMemory(int sizeHint = 0);
public abstract System.Span<byte> GetSpan(int sizeHint = 0);
[System.ObsoleteAttribute("OnReaderCompleted may not be invoked on all implementations of PipeWriter. This will be removed in a future release.")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ public void SetOwnedMemory(byte[] arrayPoolBuffer)
AvailableMemory = arrayPoolBuffer;
}

public IMemoryOwner<byte>? GetMemoryBlockAndResetSegment()
{
IMemoryOwner<byte>? memoryOwner = _memoryOwner;
if (memoryOwner != null)
{
_memoryOwner = null;
// We return the owner rather than disposing
}
else
{
Debug.Assert(_array != null);
ArrayPool<byte>.Shared.Return(_array);
_array = null;
}

Next = null;
RunningIndex = 0;
Memory = default;
_next = null;
_end = 0;
AvailableMemory = default;

return memoryOwner;
}

public void ResetMemory()
{
IMemoryOwner<byte>? memoryOwner = _memoryOwner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Threading;

namespace System.IO.Pipelines
{
/// <summary>
/// Result returned by <see cref="PipeWriter.FlushAsync"/> call
/// Result returned by <see cref="PipeWriter.FlushAsync(CancellationToken)"/> call
/// </summary>
public struct FlushResult
{
Expand All @@ -30,7 +32,7 @@ public FlushResult(bool isCanceled, bool isCompleted)
}

/// <summary>
/// True if the current <see cref="PipeWriter.FlushAsync"/> operation was canceled, otherwise false.
/// True if the current <see cref="PipeWriter.FlushAsync(CancellationToken)"/> operation was canceled, otherwise false.
/// </summary>
public bool IsCanceled => (_resultFlags & ResultFlags.Canceled) != 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ public DefaultPipeWriter(Pipe pipe)
public override void OnReaderCompleted(Action<Exception?, object?> callback, object? state) => _pipe.OnReaderCompleted(callback, state);
#pragma warning restore CS0672 // Member overrides obsolete member

public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default) => _pipe.FlushAsync(cancellationToken);
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default) => _pipe.FlushAsync(isMoreData: false, cancellationToken);

public override ValueTask<FlushResult> FlushAsync(bool isMoreData, CancellationToken cancellationToken = default) => _pipe.FlushAsync(isMoreData, cancellationToken);

public override void Advance(int bytes) => _pipe.Advance(bytes);

Expand Down
176 changes: 130 additions & 46 deletions src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,26 +176,79 @@ internal Span<byte> GetSpan(int sizeHint)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void AllocateWriteHeadIfNeeded(int sizeHint)
{
// If writing is currently active and enough space, don't need to take the lock to just set WritingActive.
// IsWritingActive is needed to prevent the reader releasing the writers memory when it fully consumes currently written.
if (!_operationState.IsWritingActive ||
_writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint)
// The very first operations we do are set IsWriting and check IsReading
// as they are our guards and we may be in a data race to avoid taking a lock.

// Set IsWrtiting before checking anything; this is important as AdvanceReader
// can release the write head to free up memory if it considers the Pipe to be idle.
if (_operationState.SetWritingIfNotWriting())
{
// Writing wasn't active; we need to be careful.
if (!_operationState.IsReadingActive)
{
int bytesLeftInBuffer = _writingHeadMemory.Length;
if (bytesLeftInBuffer > 0 && bytesLeftInBuffer >= sizeHint)
{
// Reading wasn't active and we already have enough memory,
// so we can return directly.
return;
}

if (_writingHead == null)
{
// If the Reader wasn't active after setting Writing active there will
// be no Read head so we are fully uncontended on BufferSegments, but
// we need to set everything up.
Debug.Assert(_readHead == null && _readTail == null, "Read heads can't be allocated at this point");

BufferSegment newSegment = AllocateSegment(sizeHint);
// Set all the pointers
_writingHead = _readHead = _readTail = newSegment;
_lastExaminedIndex = 0;
}
else
{
// Not enough room in current buffer; we need to rent a new BufferSegment,
// since the pipe is active we need a lock for that.
// (BufferSegment pool is shared and not guarded by the IsWriting/IsReading flags).
AllocateWriteHeadSynchronized(sizeHint);
}
}
else
{
// Writing wasn't active and overlapped with Read activity; we need to take a lock,
// to protect the Read/Write heads.
AllocateWriteHeadSynchronized(sizeHint);
}
}
else
{
AllocateWriteHeadSynchronized(sizeHint);
Debug.Assert(_writingHead != null, "Writing can't be active without a Write head");
// Writing was already active so we are safe; except if we need a new BufferSegment.
int bytesLeftInBuffer = _writingHeadMemory.Length;
if (bytesLeftInBuffer > 0 && bytesLeftInBuffer >= sizeHint)
{
// We already have enough memory, so we can return directly.
return;
}
else
{
// Not enough room in current buffer; we need to rent a new BufferSegment,
// since the pipe was active we need a lock for that.
// (BufferSegment pool is shared and not guarded by the IsWriting/IsReading flags).
AllocateWriteHeadSynchronized(sizeHint);
}
}
}

private void AllocateWriteHeadSynchronized(int sizeHint)
{
lock (_sync)
{
_operationState.BeginWrite();

if (_writingHead == null)
{
// We need to allocate memory to write since nobody has written before
// No existing write head so we need to initialize everything.
BufferSegment newSegment = AllocateSegment(sizeHint);

// Set all the pointers
_writingHead = _readHead = _readTail = newSegment;
_lastExaminedIndex = 0;
Expand Down Expand Up @@ -275,9 +328,13 @@ private void ReturnSegmentUnsynchronized(BufferSegment segment)
}
}

internal bool CommitUnsynchronized()
internal bool CommitUnsynchronized(bool isMoreData)
{
_operationState.EndWrite();
// If there is more data, don't end the Write
if (!isMoreData)
{
_operationState.EndWrite();
}

if (_unflushedBytes == 0)
{
Expand Down Expand Up @@ -313,15 +370,15 @@ internal bool CommitUnsynchronized()

internal void Advance(int bytes)
{
lock (_sync)
// Advance is not under lock, so Write must be active or Read could release
// the write head to free up idle memory before we have increased the unflushed data.
// We must have set IsWritingActive prior to adjusting the Write segments as it is a guard.
if (!_operationState.IsWritingActive || (uint)bytes > (uint)_writingHeadMemory.Length)
{
if ((uint)bytes > (uint)_writingHeadMemory.Length)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
}

AdvanceCore(bytes);
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
}

AdvanceCore(bytes);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -332,23 +389,23 @@ private void AdvanceCore(int bytesWritten)
_writingHeadMemory = _writingHeadMemory.Slice(bytesWritten);
}

internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
internal ValueTask<FlushResult> FlushAsync(bool isMoreData, CancellationToken cancellationToken)
{
CompletionData completionData;
ValueTask<FlushResult> result;
lock (_sync)
{
PrepareFlush(out completionData, out result, cancellationToken);
PrepareFlush(isMoreData, out completionData, out result, cancellationToken);
}

TrySchedule(_readerScheduler, completionData);

return result;
}

private void PrepareFlush(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
private void PrepareFlush(bool isMoreData, out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
{
var wasEmpty = CommitUnsynchronized();
var wasEmpty = CommitUnsynchronized(isMoreData);

// AttachToken before completing reader awaiter in case cancellationToken is already completed
_writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
Expand Down Expand Up @@ -392,7 +449,7 @@ internal void CompleteWriter(Exception? exception)
lock (_sync)
{
// Commit any pending buffers
CommitUnsynchronized();
CommitUnsynchronized(isMoreData: false);

completionCallbacks = _writerCompletion.TryComplete(exception);
_readerAwaitable.Complete(out completionData);
Expand Down Expand Up @@ -438,11 +495,10 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
}

BufferSegment? returnStart = null;
BufferSegment? returnEnd = null;

CompletionData completionData = default;

IMemoryOwner<byte>? blockToReturn = null;
bool success;
lock (_sync)
{
var examinedEverything = false;
Expand Down Expand Up @@ -475,6 +531,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
}
}

BufferSegment? returnStart = null;
BufferSegment? returnEnd = null;
if (consumedSegment != null)
{
if (_readHead == null)
Expand Down Expand Up @@ -511,6 +569,7 @@ void MoveReturnEndToNextBlock()
}
// If the writing head is the same as the block to be returned, then we need to make sure
// there's no pending write and that there's no buffered data for the writing head
// We must check IsWritingActive prior to touching the writing segments as it is a guard.
else if (_writingHeadBytesBuffered == 0 && !_operationState.IsWritingActive)
{
// Reset the writing head to null if it's the return block and we've consumed everything
Expand Down Expand Up @@ -541,18 +600,48 @@ void MoveReturnEndToNextBlock()
_readerAwaitable.SetUncompleted();
}

while (returnStart != null && returnStart != returnEnd)
if (returnStart != null && returnStart != returnEnd)
{
BufferSegment? next = returnStart.NextSegment;
returnStart.ResetMemory();
ReturnSegmentUnsynchronized(returnStart);
returnStart = next;
if (next == null || next == returnEnd)
{
// Fast-path, single block to return; we will return the block outside of the lock.
blockToReturn = returnStart.GetMemoryBlockAndResetSegment();
ReturnSegmentUnsynchronized(returnStart);
}
else
{
// Multiple blocks to return; we will do it inside of lock.
returnStart.ResetMemory();
ReturnSegmentUnsynchronized(returnStart);
returnStart = next;
do
{
next = returnStart.NextSegment;
returnStart.ResetMemory();
ReturnSegmentUnsynchronized(returnStart);
returnStart = next;
} while (returnStart != null && returnStart != returnEnd) ;
}
}

_operationState.EndRead();
success = _operationState.TryEndRead();
}

TrySchedule(_writerScheduler, completionData);
// Return the block before throwing the exception if there is one.
if (blockToReturn != null)
{
blockToReturn.Dispose();
}

if (success)
{
TrySchedule(_writerScheduler, completionData);
}
else
{
ThrowHelper.ThrowInvalidOperationException_NoReadToComplete();
}
}

internal void CompleteReader(Exception? exception)
Expand Down Expand Up @@ -655,7 +744,6 @@ internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}

ValueTask<ReadResult> result;
lock (_sync)
{
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
Expand All @@ -664,16 +752,12 @@ internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
if (_readerAwaitable.IsCompleted)
{
GetReadResult(out ReadResult readResult);
result = new ValueTask<ReadResult>(readResult);
}
else
{
// Otherwise it's async
result = new ValueTask<ReadResult>(_reader, token: 0);
return new ValueTask<ReadResult>(readResult);
}
}

return result;
// Otherwise it's async
return new ValueTask<ReadResult>(_reader, token: 0);
}

internal bool TryRead(out ReadResult result)
Expand Down Expand Up @@ -946,16 +1030,16 @@ private void GetFlushResult(ref FlushResult result)

internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
if (_writerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}

CompletionData completionData;
ValueTask<FlushResult> result;

lock (_sync)
{
if (_writerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}

// Allocate whatever the pool gives us so we can write, this also marks the
// state as writing
AllocateWriteHeadIfNeeded(0);
Expand All @@ -972,7 +1056,7 @@ internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, Cancella
WriteMultiSegment(source.Span);
}

PrepareFlush(out completionData, out result, cancellationToken);
PrepareFlush(isMoreData: false, out completionData, out result, cancellationToken);
}

TrySchedule(_readerScheduler, completionData);
Expand Down
Loading