diff --git a/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx b/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx
index 1801eb4fbe37e6..f9b36abec974c5 100644
--- a/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx
+++ b/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx
@@ -141,6 +141,9 @@
No reading operation to complete.
+
+ No writing operation to advance.
+
Read was canceled on underlying PipeReader.
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs
index 51b85733017382..472baf1f8436fb 100644
--- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs
+++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs
@@ -3,9 +3,7 @@
// See the LICENSE file in the project root for more information.
using System;
-using System.Collections.Generic;
using System.Runtime.CompilerServices;
-using System.Text;
using System.Diagnostics.CodeAnalysis;
namespace System.IO.Pipelines
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
index b92832c053dad5..04d9d64f741db0 100644
--- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
+++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
@@ -3,7 +3,6 @@
// See the LICENSE file in the project root for more information.
using System.Buffers;
-using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
@@ -41,19 +40,15 @@ public sealed partial class Pipe
private readonly PipeScheduler _readerScheduler;
private readonly PipeScheduler _writerScheduler;
- // Mutable struct! Don't make this readonly
- private BufferSegmentStack _bufferSegmentPool;
-
private readonly DefaultPipeReader _reader;
private readonly DefaultPipeWriter _writer;
private readonly bool _useSynchronizationContext;
+ private readonly int _maxPooledBufferSize;
- // The number of bytes flushed but not consumed by the reader
- private long _unconsumedBytes;
-
- // The number of bytes written but not flushed
- private long _unflushedBytes;
+ // Mutable struct! Don't make this readonly
+ private BufferSegmentStack _bufferSegmentPool;
+ private BufferSegment? _pooledBufferSegment;
private PipeAwaitable _readerAwaitable;
private PipeAwaitable _writerAwaitable;
@@ -61,31 +56,16 @@ public sealed partial class Pipe
private PipeCompletion _writerCompletion;
private PipeCompletion _readerCompletion;
- // Stores the last examined position, used to calculate how many bytes were to release
- // for back pressure management
- private long _lastExaminedIndex = -1;
-
- // The read head which is the start of the PipeReader's consumed bytes
- private BufferSegment? _readHead;
- private int _readHeadIndex;
-
- private readonly int _maxPooledBufferSize;
private bool _disposed;
- // The extent of the bytes available to the PipeReader to consume
- private BufferSegment? _readTail;
- private int _readTailIndex;
-
- // The write head which is the extent of the PipeWriter's written bytes
- private BufferSegment? _writingHead;
- private Memory _writingHeadMemory;
- private int _writingHeadBytesBuffered;
+ private static readonly Exception s_exceptionSentinel = new Exception();
+ private volatile Exception? _writerCompletionException;
// Determines what current operation is in flight (reading/writing)
private PipeOperationState _operationState;
- internal long Length => _unconsumedBytes;
+ internal long Length => _operationState.UnconsumedBytes;
///
/// Initializes the using as options.
@@ -107,6 +87,7 @@ public Pipe(PipeOptions options)
_bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize);
_operationState = default;
+ _operationState.LastExaminedIndex = -1;
_readerCompletion = default;
_writerCompletion = default;
@@ -132,19 +113,17 @@ private void ResetState()
_writerCompletion.Reset();
_readerAwaitable = new PipeAwaitable(completed: false, _useSynchronizationContext);
_writerAwaitable = new PipeAwaitable(completed: true, _useSynchronizationContext);
- _readTailIndex = 0;
- _readHeadIndex = 0;
- _lastExaminedIndex = -1;
- _unflushedBytes = 0;
- _unconsumedBytes = 0;
+ _writerCompletionException = null;
+ _operationState.ReadTailIndex = 0;
+ _operationState.ReadHeadIndex = 0;
+ _operationState.LastExaminedIndex = -1;
+ _operationState.UnflushedBytes = 0;
+ _operationState.UnconsumedBytes = 0;
}
internal Memory GetMemory(int sizeHint)
{
- if (_writerCompletion.IsCompleted)
- {
- ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
- }
+ ThrowIfWriterCompleted();
if (sizeHint < 0)
{
@@ -152,16 +131,14 @@ internal Memory GetMemory(int sizeHint)
}
AllocateWriteHeadIfNeeded(sizeHint);
+ ThrowIfWriterCompleted();
- return _writingHeadMemory;
+ return _operationState.WritingHeadMemory;
}
internal Span GetSpan(int sizeHint)
{
- if (_writerCompletion.IsCompleted)
- {
- ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
- }
+ ThrowIfWriterCompleted();
if (sizeHint < 0)
{
@@ -169,8 +146,18 @@ internal Span GetSpan(int sizeHint)
}
AllocateWriteHeadIfNeeded(sizeHint);
+ ThrowIfWriterCompleted();
+
+ return _operationState.WritingHeadMemory.Span;
+ }
- return _writingHeadMemory.Span;
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private void ThrowIfWriterCompleted()
+ {
+ if (_writerCompletion.IsCompleted)
+ {
+ ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
+ }
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -179,58 +166,117 @@ private void AllocateWriteHeadIfNeeded(int sizeHint)
// If writing is currently active and enough space, don't need to take the lock to just set WritingActive.
// IsWritingActive is needed to prevent the reader releasing the writers memory when it fully consumes currently written.
if (!_operationState.IsWritingActive ||
- _writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint)
+ _operationState.WritingHeadMemory.Length == 0 || _operationState.WritingHeadMemory.Length < sizeHint)
{
- AllocateWriteHeadSynchronized(sizeHint);
+ AllocateWriteHead(sizeHint);
}
}
- private void AllocateWriteHeadSynchronized(int sizeHint)
+ private void AllocateWriteHead(int sizeHint)
{
- lock (_sync)
+ // First set the Write active and block reader from returning the writer's blocks
+ _operationState.BeginWrite();
+ if (_writerCompletionException != null)
+ {
+ _operationState.EndWriteAllocation();
+ CompleteWriter();
+ return;
+ }
+
+ if (_operationState.WritingHead == null)
{
- _operationState.BeginWrite();
+ // We need to allocate memory to write since nobody has written before
+ BufferSegment newSegment = AllocateSegmentNoActiveReading(sizeHint);
- if (_writingHead == null)
+ // Set all the pointers
+ _operationState.WritingHead = _operationState.ReadHead = _operationState.ReadTail = newSegment;
+ _operationState.LastExaminedIndex = 0;
+ }
+ else
+ {
+ int bytesLeftInBuffer = _operationState.WritingHeadMemory.Length;
+
+ if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint)
{
- // We need to allocate memory to write since nobody has written before
+ if (_operationState.WritingHeadBytesBuffered > 0)
+ {
+ // Flush buffered data to the segment
+ _operationState.WritingHead.End += _operationState.WritingHeadBytesBuffered;
+ _operationState.WritingHeadBytesBuffered = 0;
+ }
+
BufferSegment newSegment = AllocateSegment(sizeHint);
- // Set all the pointers
- _writingHead = _readHead = _readTail = newSegment;
- _lastExaminedIndex = 0;
+ _operationState.WritingHead.SetNext(newSegment);
+ _operationState.WritingHead = newSegment;
}
- else
- {
- int bytesLeftInBuffer = _writingHeadMemory.Length;
+ }
- if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint)
- {
- if (_writingHeadBytesBuffered > 0)
- {
- // Flush buffered data to the segment
- _writingHead.End += _writingHeadBytesBuffered;
- _writingHeadBytesBuffered = 0;
- }
+ _operationState.EndWriteAllocation();
+ if (_writerCompletionException != null)
+ {
+ CompleteWriter();
+ }
+ }
- BufferSegment newSegment = AllocateSegment(sizeHint);
+ private BufferSegment AllocateSegment(int sizeHint)
+ {
+ BufferSegment? newSegment = Interlocked.Exchange(ref _pooledBufferSegment, null);
+ if (newSegment is null)
+ {
+ newSegment = CreateSegmentSynchronized();
+ }
- _writingHead.SetNext(newSegment);
- _writingHead = newSegment;
- }
+ InitializeSegmentMemory(sizeHint, newSegment);
+
+ return newSegment;
+ }
+
+ private BufferSegment AllocateSegmentNoActiveReading(int sizeHint)
+ {
+ Debug.Assert(_operationState.ReadHead == null, "Allocating unguarded segment when reader active!");
+ Debug.Assert(_operationState.ReadTail == null, "Allocating unguarded segment when reader active!");
+
+ BufferSegment? newSegment = _pooledBufferSegment;
+ if (newSegment is null)
+ {
+ if (!_bufferSegmentPool.TryPop(out newSegment))
+ {
+ newSegment = new BufferSegment();
}
}
+ else
+ {
+ _pooledBufferSegment = null;
+ }
+
+ InitializeSegmentMemory(sizeHint, newSegment);
+
+ return newSegment;
}
- private BufferSegment AllocateSegment(int sizeHint)
+ private BufferSegment CreateSegmentSynchronized()
{
- BufferSegment newSegment = CreateSegmentUnsynchronized();
+ lock (_sync)
+ {
+ if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
+ {
+ return segment;
+ }
+ }
+
+ return new BufferSegment();
+ }
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private void InitializeSegmentMemory(int sizeHint, BufferSegment newSegment)
+ {
int maxSize = _maxPooledBufferSize;
if (_pool != null && sizeHint <= maxSize)
{
// Use the specified pool as it fits
- newSegment.SetOwnedMemory(_pool.Rent(GetSegmentSize(sizeHint, maxSize)));
+ int sizeToRequest = GetSegmentSize(sizeHint, maxSize);
+ newSegment.SetOwnedMemory(_pool.Rent(sizeToRequest));
}
else
{
@@ -239,9 +285,7 @@ private BufferSegment AllocateSegment(int sizeHint)
newSegment.SetOwnedMemory(ArrayPool.Shared.Rent(sizeToRequest));
}
- _writingHeadMemory = newSegment.AvailableMemory;
-
- return newSegment;
+ _operationState.WritingHeadMemory = newSegment.AvailableMemory;
}
private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
@@ -253,21 +297,11 @@ private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
return adjustedToMaximumSize;
}
- private BufferSegment CreateSegmentUnsynchronized()
- {
- if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
- {
- return segment;
- }
-
- return new BufferSegment();
- }
-
private void ReturnSegmentUnsynchronized(BufferSegment segment)
{
- Debug.Assert(segment != _readHead, "Returning _readHead segment that's in use!");
- Debug.Assert(segment != _readTail, "Returning _readTail segment that's in use!");
- Debug.Assert(segment != _writingHead, "Returning _writingHead segment that's in use!");
+ Debug.Assert(segment != _operationState.ReadHead, "Returning ReadHead segment that's in use!");
+ Debug.Assert(segment != _operationState.ReadTail, "Returning ReadTail segment that's in use!");
+ Debug.Assert(segment != _operationState.WritingHead, "Returning WritingHead segment that's in use!");
if (_bufferSegmentPool.Count < MaxSegmentPoolSize)
{
@@ -279,57 +313,59 @@ internal bool CommitUnsynchronized()
{
_operationState.EndWrite();
- if (_unflushedBytes == 0)
+ if (_operationState.UnflushedBytes == 0)
{
// Nothing written to commit
return true;
}
// Update the writing head
- Debug.Assert(_writingHead != null);
- _writingHead.End += _writingHeadBytesBuffered;
+ Debug.Assert(_operationState.WritingHead != null);
+ _operationState.WritingHead.End += _operationState.WritingHeadBytesBuffered;
// Always move the read tail to the write head
- _readTail = _writingHead;
- _readTailIndex = _writingHead.End;
+ _operationState.ReadTail = _operationState.WritingHead;
+ _operationState.ReadTailIndex = _operationState.WritingHead.End;
- long oldLength = _unconsumedBytes;
- _unconsumedBytes += _unflushedBytes;
+ long oldLength = _operationState.UnconsumedBytes;
+ _operationState.UnconsumedBytes += _operationState.UnflushedBytes;
// Do not reset if reader is complete
if (_pauseWriterThreshold > 0 &&
oldLength < _pauseWriterThreshold &&
- _unconsumedBytes >= _pauseWriterThreshold &&
+ _operationState.UnconsumedBytes >= _pauseWriterThreshold &&
!_readerCompletion.IsCompleted)
{
_writerAwaitable.SetUncompleted();
}
- _unflushedBytes = 0;
- _writingHeadBytesBuffered = 0;
+ _operationState.UnflushedBytes = 0;
+ _operationState.WritingHeadBytesBuffered = 0;
return false;
}
internal void Advance(int bytes)
{
- lock (_sync)
+ if ((uint)bytes > (uint)_operationState.WritingHeadMemory.Length)
{
- if ((uint)bytes > (uint)_writingHeadMemory.Length)
- {
- ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
- }
-
- AdvanceCore(bytes);
+ ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
}
+
+ AdvanceCore(bytes);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void AdvanceCore(int bytesWritten)
{
- _unflushedBytes += bytesWritten;
- _writingHeadBytesBuffered += bytesWritten;
- _writingHeadMemory = _writingHeadMemory.Slice(bytesWritten);
+ if (!_operationState.IsWritingActive)
+ {
+ ThrowHelper.ThrowInvalidOperationException_NoWriteToAdvance();
+ }
+
+ _operationState.UnflushedBytes += bytesWritten;
+ _operationState.WritingHeadBytesBuffered += bytesWritten;
+ _operationState.WritingHeadMemory = _operationState.WritingHeadMemory.Slice(bytesWritten);
}
internal ValueTask FlushAsync(CancellationToken cancellationToken)
@@ -338,7 +374,7 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken)
ValueTask result;
lock (_sync)
{
- PrepareFlush(out completionData, out result, cancellationToken);
+ PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}
TrySchedule(_readerScheduler, completionData);
@@ -346,9 +382,9 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken)
return result;
}
- private void PrepareFlush(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken)
+ private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken)
{
- var wasEmpty = CommitUnsynchronized();
+ bool wasEmpty = CommitUnsynchronized();
// AttachToken before completing reader awaiter in case cancellationToken is already completed
_writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
@@ -385,6 +421,29 @@ private void PrepareFlush(out CompletionData completionData, out ValueTask= 0)
+ if (examinedSegment != null && _operationState.LastExaminedIndex >= 0)
{
- long examinedBytes = BufferSegment.GetLength(_lastExaminedIndex, examinedSegment, examinedIndex);
- long oldLength = _unconsumedBytes;
+ long examinedBytes = BufferSegment.GetLength(_operationState.LastExaminedIndex, examinedSegment, examinedIndex);
+ long oldLength = _operationState.UnconsumedBytes;
if (examinedBytes < 0)
{
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedPosition();
}
- _unconsumedBytes -= examinedBytes;
+ _operationState.UnconsumedBytes -= examinedBytes;
// Store the absolute position
- _lastExaminedIndex = examinedSegment.RunningIndex + examinedIndex;
+ _operationState.LastExaminedIndex = examinedSegment.RunningIndex + examinedIndex;
- Debug.Assert(_unconsumedBytes >= 0, "Length has gone negative");
+ Debug.Assert(_operationState.UnconsumedBytes >= 0, "Length has gone negative");
if (oldLength >= _resumeWriterThreshold &&
- _unconsumedBytes < _resumeWriterThreshold)
+ _operationState.UnconsumedBytes < _resumeWriterThreshold)
{
_writerAwaitable.Complete(out completionData);
}
}
+ BufferSegment? returnStart = null;
+ BufferSegment? returnEnd = null;
if (consumedSegment != null)
{
- if (_readHead == null)
+ if (_operationState.ReadHead == null)
{
ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor();
return;
}
- returnStart = _readHead;
+ returnStart = _operationState.ReadHead;
returnEnd = consumedSegment;
void MoveReturnEndToNextBlock()
{
BufferSegment? nextBlock = returnEnd!.NextSegment;
- if (_readTail == returnEnd)
+ if (_operationState.ReadTail == returnEnd)
{
- _readTail = nextBlock;
- _readTailIndex = 0;
+ _operationState.ReadTail = nextBlock;
+ _operationState.ReadTailIndex = 0;
}
- _readHead = nextBlock;
- _readHeadIndex = 0;
+ _operationState.ReadHead = nextBlock;
+ _operationState.ReadHeadIndex = 0;
returnEnd = nextBlock;
}
@@ -505,30 +563,30 @@ void MoveReturnEndToNextBlock()
{
// If the writing head isn't block we're about to return, then we can move to the next one
// and return this block safely
- if (_writingHead != returnEnd)
+ if (_operationState.WritingHead != returnEnd)
{
MoveReturnEndToNextBlock();
}
// If the writing head is the same as the block to be returned, then we need to make sure
// there's no pending write and that there's no buffered data for the writing head
- else if (_writingHeadBytesBuffered == 0 && !_operationState.IsWritingActive)
+ else if (_operationState.WritingHeadBytesBuffered == 0 && !_operationState.IsWritingActive)
{
// Reset the writing head to null if it's the return block and we've consumed everything
- _writingHead = null;
- _writingHeadMemory = default;
+ _operationState.WritingHead = null;
+ _operationState.WritingHeadMemory = default;
MoveReturnEndToNextBlock();
}
else
{
- _readHead = consumedSegment;
- _readHeadIndex = consumedIndex;
+ _operationState.ReadHead = consumedSegment;
+ _operationState.ReadHeadIndex = consumedIndex;
}
}
else
{
- _readHead = consumedSegment;
- _readHeadIndex = consumedIndex;
+ _operationState.ReadHead = consumedSegment;
+ _operationState.ReadHeadIndex = consumedIndex;
}
}
@@ -541,12 +599,23 @@ void MoveReturnEndToNextBlock()
_readerAwaitable.SetUncompleted();
}
- while (returnStart != null && returnStart != returnEnd)
+ if (returnStart != null && returnStart != returnEnd)
{
BufferSegment? next = returnStart.NextSegment;
returnStart.ResetMemory();
- ReturnSegmentUnsynchronized(returnStart);
+ if (Interlocked.CompareExchange(ref _pooledBufferSegment, returnStart, null) != null)
+ {
+ ReturnSegmentUnsynchronized(returnStart);
+ }
returnStart = next;
+
+ while (returnStart != null && returnStart != returnEnd)
+ {
+ next = returnStart.NextSegment;
+ returnStart.ResetMemory();
+ ReturnSegmentUnsynchronized(returnStart);
+ returnStart = next;
+ }
}
_operationState.EndRead();
@@ -655,7 +724,6 @@ internal ValueTask ReadAsync(CancellationToken token)
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
- ValueTask result;
lock (_sync)
{
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
@@ -664,16 +732,12 @@ internal ValueTask ReadAsync(CancellationToken token)
if (_readerAwaitable.IsCompleted)
{
GetReadResult(out ReadResult readResult);
- result = new ValueTask(readResult);
- }
- else
- {
- // Otherwise it's async
- result = new ValueTask(_reader, token: 0);
+ return new ValueTask(readResult);
}
}
- return result;
+ // Otherwise it's async
+ return new ValueTask(_reader, token: 0);
}
internal bool TryRead(out ReadResult result)
@@ -685,7 +749,7 @@ internal bool TryRead(out ReadResult result)
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
- if (_unconsumedBytes > 0 || _readerAwaitable.IsCompleted)
+ if (_operationState.UnconsumedBytes > 0 || _readerAwaitable.IsCompleted)
{
GetReadResult(out result);
return true;
@@ -785,9 +849,9 @@ private void CompletePipe()
_disposed = true;
// Return all segments
- // if _readHead is null we need to try return _commitHead
+ // if ReadHead is null we need to try return _commitHead
// because there might be a block allocated for writing
- BufferSegment? segment = _readHead ?? _readTail;
+ BufferSegment? segment = _operationState.ReadHead ?? _operationState.ReadTail;
while (segment != null)
{
BufferSegment returnSegment = segment;
@@ -796,10 +860,10 @@ private void CompletePipe()
returnSegment.ResetMemory();
}
- _writingHead = null;
- _readHead = null;
- _readTail = null;
- _lastExaminedIndex = -1;
+ _operationState.WritingHead = null;
+ _operationState.ReadHead = null;
+ _operationState.ReadTail = null;
+ _operationState.LastExaminedIndex = -1;
}
}
@@ -865,12 +929,12 @@ private void GetReadResult(out ReadResult result)
bool isCanceled = _readerAwaitable.ObserveCancellation();
// No need to read end if there is no head
- BufferSegment? head = _readHead;
+ BufferSegment? head = _operationState.ReadHead;
if (head != null)
{
- Debug.Assert(_readTail != null);
+ Debug.Assert(_operationState.ReadTail != null);
// Reading commit head shared with writer
- var readOnlySequence = new ReadOnlySequence(head, _readHeadIndex, _readTail, _readTailIndex);
+ var readOnlySequence = new ReadOnlySequence(head, _operationState.ReadHeadIndex, _operationState.ReadTail, _operationState.ReadTailIndex);
result = new ReadResult(readOnlySequence, isCanceled, isCompleted);
}
else
@@ -946,23 +1010,24 @@ private void GetFlushResult(ref FlushResult result)
internal ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken)
{
- if (_writerCompletion.IsCompleted)
- {
- ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
- }
-
CompletionData completionData;
ValueTask result;
lock (_sync)
{
+ ThrowIfWriterCompleted();
// Allocate whatever the pool gives us so we can write, this also marks the
// state as writing
AllocateWriteHeadIfNeeded(0);
+ if (_writerCompletionException != null)
+ {
+ CompleteWriter();
+ ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
+ }
- if (source.Length <= _writingHeadMemory.Length)
+ if (source.Length <= _operationState.WritingHeadMemory.Length)
{
- source.CopyTo(_writingHeadMemory);
+ source.CopyTo(_operationState.WritingHeadMemory);
AdvanceCore(source.Length);
}
@@ -972,17 +1037,18 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella
WriteMultiSegment(source.Span);
}
- PrepareFlush(out completionData, out result, cancellationToken);
+ PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}
TrySchedule(_readerScheduler, completionData);
+
return result;
}
private void WriteMultiSegment(ReadOnlySpan source)
{
- Debug.Assert(_writingHead != null);
- Span destination = _writingHeadMemory.Span;
+ Debug.Assert(_operationState.WritingHead != null);
+ Span destination = _operationState.WritingHeadMemory.Span;
while (true)
{
@@ -997,17 +1063,17 @@ private void WriteMultiSegment(ReadOnlySpan source)
}
// We filled the segment
- _writingHead.End += writable;
- _writingHeadBytesBuffered = 0;
+ _operationState.WritingHead.End += writable;
+ _operationState.WritingHeadBytesBuffered = 0;
// This is optimized to use pooled memory. That's why we pass 0 instead of
// source.Length
BufferSegment newSegment = AllocateSegment(0);
- _writingHead.SetNext(newSegment);
- _writingHead = newSegment;
+ _operationState.WritingHead.SetNext(newSegment);
+ _operationState.WritingHead = newSegment;
- destination = _writingHeadMemory.Span;
+ destination = _operationState.WritingHeadMemory.Span;
}
}
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs
index 1271df5a9a11c8..551c40869bb952 100644
--- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs
+++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs
@@ -4,70 +4,195 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
namespace System.IO.Pipelines
{
- [DebuggerDisplay("State: {_state}")]
+ [DebuggerDisplay("State: {State}")]
+ [StructLayout(LayoutKind.Explicit)]
internal struct PipeOperationState
{
- private State _state;
+ // Ensure reader and writer data not on same cache line
+ [FieldOffset(0)]
+ private ReaderData _readerData;
+ [FieldOffset(128)]
+ private WriterData _writerData;
+
+ public BufferSegment? WritingHead
+ {
+ get => _writerData._writingHead;
+ set => _writerData._writingHead = value;
+ }
+
+ public Memory WritingHeadMemory
+ {
+ get => _writerData._writingHeadMemory;
+ set => _writerData._writingHeadMemory = value;
+ }
+
+ public int WritingHeadBytesBuffered
+ {
+ get => _writerData._writingHeadBytesBuffered;
+ set => _writerData._writingHeadBytesBuffered = value;
+ }
+
+ public long UnflushedBytes
+ {
+ get => _writerData._unflushedBytes;
+ set => _writerData._unflushedBytes = value;
+ }
+
+ public BufferSegment? ReadTail
+ {
+ get => _readerData._readTail;
+ set => _readerData._readTail = value;
+ }
+
+ public int ReadTailIndex
+ {
+ get => _readerData._readTailIndex;
+ set => _readerData._readTailIndex = value;
+ }
+
+ public BufferSegment? ReadHead
+ {
+ get => _readerData._readHead;
+ set => _readerData._readHead = value;
+ }
+
+ public int ReadHeadIndex
+ {
+ get => _readerData._readHeadIndex;
+ set => _readerData._readHeadIndex = value;
+ }
+
+ public long UnconsumedBytes
+ {
+ get => _readerData._unconsumedBytes;
+ set => _readerData._unconsumedBytes = value;
+ }
+
+ public long LastExaminedIndex
+ {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ get => _readerData._lastExaminedIndex;
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ set => _readerData._lastExaminedIndex = value;
+ }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginRead()
{
- if ((_state & State.Reading) == State.Reading)
+ if ((_readerData._readState & ReadState.Reading) != 0)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}
- _state |= State.Reading;
+ _readerData._readState |= ReadState.Reading;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginReadTentative()
{
- if ((_state & State.Reading) == State.Reading)
+ if ((_readerData._readState & ReadState.Reading) != 0)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}
- _state |= State.ReadingTentative;
+ _readerData._readState |= ReadState.ReadingTentative;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndRead()
{
- if ((_state & State.Reading) != State.Reading &&
- (_state & State.ReadingTentative) != State.ReadingTentative)
+ if (_readerData._readState == ReadState.Inactive)
{
ThrowHelper.ThrowInvalidOperationException_NoReadToComplete();
}
- _state &= ~(State.Reading | State.ReadingTentative);
+ _readerData._readState = ReadState.Inactive;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginWrite()
{
- _state |= State.Writing;
+ _writerData._writeState = (WriteState.Allocating | WriteState.Writing);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndWrite()
{
- _state &= ~State.Writing;
+ _writerData._writeState = WriteState.Inactive;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public void EndWriteAllocation()
+ {
+ Debug.Assert(_writerData._writeState == (WriteState.Allocating | WriteState.Writing));
+ _writerData._writeState = WriteState.Writing;
+ }
+
+ public bool IsWritingActive
+ {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ get => (_writerData._writeState & WriteState.Writing) != 0;
+ }
+
+ public bool IsWritingAllocating
+ {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ get => (_writerData._writeState & WriteState.Allocating) != 0;
}
- public bool IsWritingActive => (_state & State.Writing) == State.Writing;
+ public bool IsReadingActive
+ {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ get => (_readerData._readState & ReadState.Reading) != 0;
+ }
- public bool IsReadingActive => (_state & State.Reading) == State.Reading;
+ private string State => $"WriteState: {_writerData._writeState}; ReadState: {_readerData._readState}";
+
+ private struct ReaderData
+ {
+ public ReadState _readState;
+ // The read head which is the start of the PipeReader's consumed bytes
+ public BufferSegment? _readHead;
+ public int _readHeadIndex;
+ // The extent of the bytes available to the PipeReader to consume
+ public BufferSegment? _readTail;
+ public int _readTailIndex;
+ // The number of bytes flushed but not consumed by the reader
+ public long _unconsumedBytes;
+ // Stores the last examined position, used to calculate how many bytes were to release
+ // for back pressure management
+ public long _lastExaminedIndex;
+ }
+
+ private struct WriterData
+ {
+ public volatile WriteState _writeState;
+ // The write head which is the extent of the PipeWriter's written bytes
+ public BufferSegment? _writingHead;
+ public Memory _writingHeadMemory;
+ public int _writingHeadBytesBuffered;
+ // The number of bytes written but not flushed
+ public long _unflushedBytes;
+ }
[Flags]
- internal enum State : byte
+ internal enum ReadState : int
{
+ Inactive = 0,
Reading = 1,
- ReadingTentative = 2,
- Writing = 4
+ ReadingTentative = 2
+ }
+
+ [Flags]
+ internal enum WriteState : int
+ {
+ Inactive = 0,
+ Allocating = 1,
+ Writing = 2
}
}
}
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
index 4a4ff09622c3bd..9d8dabe4f0da46 100644
--- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
+++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
@@ -151,7 +151,7 @@ private void AdvanceTo(BufferSegment? consumedSegment, int consumedIndex, Buffer
{
BufferSegment next = returnStart.NextSegment!;
returnStart.ResetMemory();
- ReturnSegmentUnsynchronized(returnStart);
+ ReturnSegment(returnStart);
returnStart = next;
}
}
@@ -323,7 +323,7 @@ private void AllocateReadTail()
private BufferSegment AllocateSegment()
{
- BufferSegment nextSegment = CreateSegmentUnsynchronized();
+ BufferSegment nextSegment = CreateSegment();
if (_pool is null)
{
@@ -337,7 +337,7 @@ private BufferSegment AllocateSegment()
return nextSegment;
}
- private BufferSegment CreateSegmentUnsynchronized()
+ private BufferSegment CreateSegment()
{
if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
{
@@ -347,7 +347,7 @@ private BufferSegment CreateSegmentUnsynchronized()
return new BufferSegment();
}
- private void ReturnSegmentUnsynchronized(BufferSegment segment)
+ private void ReturnSegment(BufferSegment segment)
{
Debug.Assert(segment != _readHead, "Returning _readHead segment that's in use!");
Debug.Assert(segment != _readTail, "Returning _readTail segment that's in use!");
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs
index c87d6f367cd3b0..41d2ebea766b7b 100644
--- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs
+++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs
@@ -150,7 +150,7 @@ private void AllocateMemory(int sizeHint)
private BufferSegment AllocateSegment(int sizeHint)
{
- BufferSegment newSegment = CreateSegmentUnsynchronized();
+ BufferSegment newSegment = CreateSegment();
if (_pool is null || sizeHint > _pool.MaxBufferSize)
{
@@ -178,7 +178,7 @@ private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
return adjustedToMaximumSize;
}
- private BufferSegment CreateSegmentUnsynchronized()
+ private BufferSegment CreateSegment()
{
if (_bufferSegmentPool.TryPop(out BufferSegment? segment))
{
@@ -188,7 +188,7 @@ private BufferSegment CreateSegmentUnsynchronized()
return new BufferSegment();
}
- private void ReturnSegmentUnsynchronized(BufferSegment segment)
+ private void ReturnSegment(BufferSegment segment)
{
if (_bufferSegmentPool.Count < MaxSegmentPoolSize)
{
@@ -297,7 +297,7 @@ private async ValueTask FlushAsyncInternal(bool writeToStream, Canc
}
returnSegment.ResetMemory();
- ReturnSegmentUnsynchronized(returnSegment);
+ ReturnSegment(returnSegment);
// Update the head segment after we return the current segment
_head = segment;
@@ -364,7 +364,7 @@ private void FlushInternal(bool writeToStream)
}
returnSegment.ResetMemory();
- ReturnSegmentUnsynchronized(returnSegment);
+ ReturnSegment(returnSegment);
// Update the head segment after we return the current segment
_head = segment;
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs
index 6a124e50f9ee5e..7e0118ff143c1c 100644
--- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs
+++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs
@@ -19,6 +19,11 @@ internal static class ThrowHelper
[MethodImpl(MethodImplOptions.NoInlining)]
private static Exception CreateArgumentNullException(ExceptionArgument argument) => new ArgumentNullException(argument.ToString());
+ [DoesNotReturn]
+ public static void ThrowInvalidOperationException_NoWriteToAdvance() => throw CreateInvalidOperationException_NoWriteToAdvance();
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ public static Exception CreateInvalidOperationException_NoWriteToAdvance() => new InvalidOperationException(SR.NoWritingOperationToAdvance);
+
[DoesNotReturn]
public static void ThrowInvalidOperationException_AlreadyReading() => throw CreateInvalidOperationException_AlreadyReading();
[MethodImpl(MethodImplOptions.NoInlining)]
diff --git a/src/libraries/System.IO.Pipelines/tests/BufferSegmentPoolTest.cs b/src/libraries/System.IO.Pipelines/tests/BufferSegmentPoolTest.cs
index 864920a46fb028..d78d8753d38c63 100644
--- a/src/libraries/System.IO.Pipelines/tests/BufferSegmentPoolTest.cs
+++ b/src/libraries/System.IO.Pipelines/tests/BufferSegmentPoolTest.cs
@@ -57,7 +57,7 @@ public async Task BufferSegmentsAreReused()
[Fact]
public async Task BufferSegmentsPooledUpToThreshold()
{
- int blockCount = Pipe.MaxSegmentPoolSize + 1;
+ int blockCount = Pipe.MaxSegmentPoolSize + 2;
// Write 256 blocks to ensure they get reused
for (int i = 0; i < blockCount; i++)
@@ -73,7 +73,7 @@ public async Task BufferSegmentsPooledUpToThreshold()
Assert.Equal(blockCount, oldSegments.Count);
- // This should return them all to the segment pool (256 blocks, the last block will be discarded)
+ // This should return them all to the segment pool (256 + 1) blocks, the last block will be discarded
_pipe.Reader.AdvanceTo(result.Buffer.End);
for (int i = 0; i < blockCount; i++)
@@ -91,14 +91,14 @@ public async Task BufferSegmentsPooledUpToThreshold()
_pipe.Reader.AdvanceTo(result.Buffer.End);
- // Assert Pipe.MaxSegmentPoolSize pooled segments
- for (int i = 0; i < Pipe.MaxSegmentPoolSize; i++)
+ // Assert Pipe.MaxSegmentPoolSize + 1 pooled segments (one held inline).
+ for (int i = 0; i < Pipe.MaxSegmentPoolSize + 1; i++)
{
- Assert.Same(oldSegments[i], newSegments[Pipe.MaxSegmentPoolSize - i - 1]);
+ Assert.Contains(oldSegments[i], newSegments);
}
// The last segment shouldn't exist in the new list of segments at all (it should be new)
- Assert.DoesNotContain(oldSegments[256], newSegments);
+ Assert.DoesNotContain(oldSegments[blockCount - 1], newSegments);
}
private static List> GetSegments(ReadResult result)