Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/libraries/System.IO.Pipelines/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@
<data name="NoReadingOperationToComplete" xml:space="preserve">
<value>No reading operation to complete.</value>
</data>
<data name="NoWritingOperationToAdvance" xml:space="preserve">
<value>No writing operation to advance.</value>
</data>
<data name="ReadCanceledOnPipeReader" xml:space="preserve">
<value>Read was canceled on underlying PipeReader.</value>
</data>
Expand Down
118 changes: 61 additions & 57 deletions src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ public sealed partial class Pipe
// The number of bytes flushed but not consumed by the reader
private long _unconsumedBytes;

// The number of bytes written but not flushed
private long _unflushedBytes;

private PipeAwaitable _readerAwaitable;
private PipeAwaitable _writerAwaitable;

Expand All @@ -78,8 +75,6 @@ public sealed partial class Pipe

// The write head which is the extent of the PipeWriter's written bytes
private BufferSegment? _writingHead;
private Memory<byte> _writingHeadMemory;
private int _writingHeadBytesBuffered;

// Determines what current operation is in flight (reading/writing)
private PipeOperationState _operationState;
Expand Down Expand Up @@ -135,7 +130,7 @@ private void ResetState()
_readTailIndex = 0;
_readHeadIndex = 0;
_lastExaminedIndex = -1;
_unflushedBytes = 0;
_operationState.UnflushedBytes = 0;
_unconsumedBytes = 0;
}

Expand All @@ -153,7 +148,7 @@ internal Memory<byte> GetMemory(int sizeHint)

AllocateWriteHeadIfNeeded(sizeHint);

return _writingHeadMemory;
return _operationState.WritingHeadMemory;
}

internal Span<byte> GetSpan(int sizeHint)
Expand All @@ -170,7 +165,7 @@ internal Span<byte> GetSpan(int sizeHint)

AllocateWriteHeadIfNeeded(sizeHint);

return _writingHeadMemory.Span;
return _operationState.WritingHeadMemory.Span;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -179,38 +174,49 @@ private void AllocateWriteHeadIfNeeded(int sizeHint)
// If writing is currently active and enough space, don't need to take the lock to just set WritingActive.
// IsWritingActive is needed to prevent the reader releasing the writers memory when it fully consumes currently written.
if (!_operationState.IsWritingActive ||
_writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint)
_operationState.WritingHeadMemory.Length == 0 || _operationState.WritingHeadMemory.Length < sizeHint)
{
AllocateWriteHeadSynchronized(sizeHint);
// Set wrtiting before checking anything; this is important as AdvanceReader can release the write head.
_operationState.BeginWrite();

if (!_operationState.IsReadingActive && _writingHead == null)
{
// No reader and no existing data; so we don't need to take the lock.
BufferSegment newSegment = AllocateSegment(sizeHint);
// Set all the pointers
_writingHead = _readHead = _readTail = newSegment;
_lastExaminedIndex = 0;
}
else
{
AllocateWriteHeadSynchronized(sizeHint);
}
}
}

private void AllocateWriteHeadSynchronized(int sizeHint)
{
lock (_sync)
{
_operationState.BeginWrite();

if (_writingHead == null)
{
// We need to allocate memory to write since nobody has written before
// No existing write head so we need to initialize everything.
BufferSegment newSegment = AllocateSegment(sizeHint);

// Set all the pointers
_writingHead = _readHead = _readTail = newSegment;
_lastExaminedIndex = 0;
}
else
{
int bytesLeftInBuffer = _writingHeadMemory.Length;
int bytesLeftInBuffer = _operationState.WritingHeadMemory.Length;

if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint)
{
if (_writingHeadBytesBuffered > 0)
if (_operationState.WritingHeadBytesBuffered > 0)
{
// Flush buffered data to the segment
_writingHead.End += _writingHeadBytesBuffered;
_writingHeadBytesBuffered = 0;
_writingHead.End += _operationState.WritingHeadBytesBuffered;
_operationState.WritingHeadBytesBuffered = 0;
}

BufferSegment newSegment = AllocateSegment(sizeHint);
Expand All @@ -230,7 +236,8 @@ private BufferSegment AllocateSegment(int sizeHint)
if (_pool != null && sizeHint <= maxSize)
{
// Use the specified pool as it fits
newSegment.SetOwnedMemory(_pool.Rent(GetSegmentSize(sizeHint, maxSize)));
int sizeToRequest = GetSegmentSize(sizeHint, maxSize);
newSegment.SetOwnedMemory(_pool.Rent(sizeToRequest));
}
else
{
Expand All @@ -239,7 +246,7 @@ private BufferSegment AllocateSegment(int sizeHint)
newSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(sizeToRequest));
}

_writingHeadMemory = newSegment.AvailableMemory;
_operationState.WritingHeadMemory = newSegment.AvailableMemory;

return newSegment;
}
Expand Down Expand Up @@ -279,22 +286,22 @@ internal bool CommitUnsynchronized()
{
_operationState.EndWrite();

if (_unflushedBytes == 0)
if (_operationState.UnflushedBytes == 0)
{
// Nothing written to commit
return true;
}

// Update the writing head
Debug.Assert(_writingHead != null);
_writingHead.End += _writingHeadBytesBuffered;
_writingHead.End += _operationState.WritingHeadBytesBuffered;

// Always move the read tail to the write head
_readTail = _writingHead;
_readTailIndex = _writingHead.End;

long oldLength = _unconsumedBytes;
_unconsumedBytes += _unflushedBytes;
_unconsumedBytes += _operationState.UnflushedBytes;

// Do not reset if reader is complete
if (_pauseWriterThreshold > 0 &&
Expand All @@ -305,31 +312,33 @@ internal bool CommitUnsynchronized()
_writerAwaitable.SetUncompleted();
}

_unflushedBytes = 0;
_writingHeadBytesBuffered = 0;
_operationState.UnflushedBytes = 0;
_operationState.WritingHeadBytesBuffered = 0;

return false;
}

internal void Advance(int bytes)
{
lock (_sync)
if ((uint)bytes > (uint)_operationState.WritingHeadMemory.Length)
{
if ((uint)bytes > (uint)_writingHeadMemory.Length)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
}

AdvanceCore(bytes);
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
}

AdvanceCore(bytes);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void AdvanceCore(int bytesWritten)
{
_unflushedBytes += bytesWritten;
_writingHeadBytesBuffered += bytesWritten;
_writingHeadMemory = _writingHeadMemory.Slice(bytesWritten);
if (!_operationState.IsWritingActive)
{
ThrowHelper.ThrowInvalidOperationException_NoWriteToAdvance();
}

_operationState.UnflushedBytes += bytesWritten;
_operationState.WritingHeadBytesBuffered += bytesWritten;
_operationState.WritingHeadMemory = _operationState.WritingHeadMemory.Slice(bytesWritten);
}

internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
Expand All @@ -338,17 +347,17 @@ internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
ValueTask<FlushResult> result;
lock (_sync)
{
PrepareFlush(out completionData, out result, cancellationToken);
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}

TrySchedule(_readerScheduler, completionData);

return result;
}

private void PrepareFlush(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
{
var wasEmpty = CommitUnsynchronized();
bool wasEmpty = CommitUnsynchronized();

// AttachToken before completing reader awaiter in case cancellationToken is already completed
_writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
Expand Down Expand Up @@ -438,14 +447,12 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
}

BufferSegment? returnStart = null;
BufferSegment? returnEnd = null;

CompletionData completionData = default;

lock (_sync)
{
var examinedEverything = false;
bool examinedEverything = false;
if (examinedSegment == _readTail)
{
examinedEverything = examinedIndex == _readTailIndex;
Expand Down Expand Up @@ -475,6 +482,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
}
}

BufferSegment? returnStart = null;
BufferSegment? returnEnd = null;
if (consumedSegment != null)
{
if (_readHead == null)
Expand Down Expand Up @@ -511,11 +520,11 @@ void MoveReturnEndToNextBlock()
}
// If the writing head is the same as the block to be returned, then we need to make sure
// there's no pending write and that there's no buffered data for the writing head
else if (_writingHeadBytesBuffered == 0 && !_operationState.IsWritingActive)
else if (_operationState.WritingHeadBytesBuffered == 0 && !_operationState.IsWritingActive)
{
// Reset the writing head to null if it's the return block and we've consumed everything
_writingHead = null;
_writingHeadMemory = default;
_operationState.WritingHeadMemory = default;

MoveReturnEndToNextBlock();
}
Expand Down Expand Up @@ -655,7 +664,6 @@ internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}

ValueTask<ReadResult> result;
lock (_sync)
{
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
Expand All @@ -664,16 +672,12 @@ internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
if (_readerAwaitable.IsCompleted)
{
GetReadResult(out ReadResult readResult);
result = new ValueTask<ReadResult>(readResult);
}
else
{
// Otherwise it's async
result = new ValueTask<ReadResult>(_reader, token: 0);
return new ValueTask<ReadResult>(readResult);
}
}

return result;
// Otherwise it's async
return new ValueTask<ReadResult>(_reader, token: 0);
}

internal bool TryRead(out ReadResult result)
Expand Down Expand Up @@ -960,9 +964,9 @@ internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, Cancella
// state as writing
AllocateWriteHeadIfNeeded(0);

if (source.Length <= _writingHeadMemory.Length)
if (source.Length <= _operationState.WritingHeadMemory.Length)
{
source.CopyTo(_writingHeadMemory);
source.CopyTo(_operationState.WritingHeadMemory);

AdvanceCore(source.Length);
}
Expand All @@ -972,7 +976,7 @@ internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, Cancella
WriteMultiSegment(source.Span);
}

PrepareFlush(out completionData, out result, cancellationToken);
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}

TrySchedule(_readerScheduler, completionData);
Expand All @@ -982,7 +986,7 @@ internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, Cancella
private void WriteMultiSegment(ReadOnlySpan<byte> source)
{
Debug.Assert(_writingHead != null);
Span<byte> destination = _writingHeadMemory.Span;
Span<byte> destination = _operationState.WritingHeadMemory.Span;

while (true)
{
Expand All @@ -998,7 +1002,7 @@ private void WriteMultiSegment(ReadOnlySpan<byte> source)

// We filled the segment
_writingHead.End += writable;
_writingHeadBytesBuffered = 0;
_operationState.WritingHeadBytesBuffered = 0;

// This is optimized to use pooled memory. That's why we pass 0 instead of
// source.Length
Expand All @@ -1007,7 +1011,7 @@ private void WriteMultiSegment(ReadOnlySpan<byte> source)
_writingHead.SetNext(newSegment);
_writingHead = newSegment;

destination = _writingHeadMemory.Span;
destination = _operationState.WritingHeadMemory.Span;
}
}

Expand Down
Loading