diff --git a/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx b/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx
index 1801eb4fbe37e6..f9b36abec974c5 100644
--- a/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx
+++ b/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx
@@ -141,6 +141,9 @@
No reading operation to complete.
+
+ No writing operation to advance.
+
Read was canceled on underlying PipeReader.
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
index b92832c053dad5..069f3f59de008d 100644
--- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
+++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
@@ -52,9 +52,6 @@ public sealed partial class Pipe
// The number of bytes flushed but not consumed by the reader
private long _unconsumedBytes;
- // The number of bytes written but not flushed
- private long _unflushedBytes;
-
private PipeAwaitable _readerAwaitable;
private PipeAwaitable _writerAwaitable;
@@ -78,8 +75,6 @@ public sealed partial class Pipe
// The write head which is the extent of the PipeWriter's written bytes
private BufferSegment? _writingHead;
- private Memory _writingHeadMemory;
- private int _writingHeadBytesBuffered;
// Determines what current operation is in flight (reading/writing)
private PipeOperationState _operationState;
@@ -135,7 +130,7 @@ private void ResetState()
_readTailIndex = 0;
_readHeadIndex = 0;
_lastExaminedIndex = -1;
- _unflushedBytes = 0;
+ _operationState.UnflushedBytes = 0;
_unconsumedBytes = 0;
}
@@ -153,7 +148,7 @@ internal Memory GetMemory(int sizeHint)
AllocateWriteHeadIfNeeded(sizeHint);
- return _writingHeadMemory;
+ return _operationState.WritingHeadMemory;
}
internal Span GetSpan(int sizeHint)
@@ -170,7 +165,7 @@ internal Span GetSpan(int sizeHint)
AllocateWriteHeadIfNeeded(sizeHint);
- return _writingHeadMemory.Span;
+ return _operationState.WritingHeadMemory.Span;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@@ -179,9 +174,23 @@ private void AllocateWriteHeadIfNeeded(int sizeHint)
// If writing is currently active and enough space, don't need to take the lock to just set WritingActive.
// IsWritingActive is needed to prevent the reader releasing the writers memory when it fully consumes currently written.
if (!_operationState.IsWritingActive ||
- _writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint)
+ _operationState.WritingHeadMemory.Length == 0 || _operationState.WritingHeadMemory.Length < sizeHint)
{
- AllocateWriteHeadSynchronized(sizeHint);
+ // Set wrtiting before checking anything; this is important as AdvanceReader can release the write head.
+ _operationState.BeginWrite();
+
+ if (!_operationState.IsReadingActive && _writingHead == null)
+ {
+ // No reader and no existing data; so we don't need to take the lock.
+ BufferSegment newSegment = AllocateSegment(sizeHint);
+ // Set all the pointers
+ _writingHead = _readHead = _readTail = newSegment;
+ _lastExaminedIndex = 0;
+ }
+ else
+ {
+ AllocateWriteHeadSynchronized(sizeHint);
+ }
}
}
@@ -189,28 +198,25 @@ private void AllocateWriteHeadSynchronized(int sizeHint)
{
lock (_sync)
{
- _operationState.BeginWrite();
-
if (_writingHead == null)
{
- // We need to allocate memory to write since nobody has written before
+ // No existing write head so we need to initialize everything.
BufferSegment newSegment = AllocateSegment(sizeHint);
-
// Set all the pointers
_writingHead = _readHead = _readTail = newSegment;
_lastExaminedIndex = 0;
}
else
{
- int bytesLeftInBuffer = _writingHeadMemory.Length;
+ int bytesLeftInBuffer = _operationState.WritingHeadMemory.Length;
if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint)
{
- if (_writingHeadBytesBuffered > 0)
+ if (_operationState.WritingHeadBytesBuffered > 0)
{
// Flush buffered data to the segment
- _writingHead.End += _writingHeadBytesBuffered;
- _writingHeadBytesBuffered = 0;
+ _writingHead.End += _operationState.WritingHeadBytesBuffered;
+ _operationState.WritingHeadBytesBuffered = 0;
}
BufferSegment newSegment = AllocateSegment(sizeHint);
@@ -230,7 +236,8 @@ private BufferSegment AllocateSegment(int sizeHint)
if (_pool != null && sizeHint <= maxSize)
{
// Use the specified pool as it fits
- newSegment.SetOwnedMemory(_pool.Rent(GetSegmentSize(sizeHint, maxSize)));
+ int sizeToRequest = GetSegmentSize(sizeHint, maxSize);
+ newSegment.SetOwnedMemory(_pool.Rent(sizeToRequest));
}
else
{
@@ -239,7 +246,7 @@ private BufferSegment AllocateSegment(int sizeHint)
newSegment.SetOwnedMemory(ArrayPool.Shared.Rent(sizeToRequest));
}
- _writingHeadMemory = newSegment.AvailableMemory;
+ _operationState.WritingHeadMemory = newSegment.AvailableMemory;
return newSegment;
}
@@ -279,7 +286,7 @@ internal bool CommitUnsynchronized()
{
_operationState.EndWrite();
- if (_unflushedBytes == 0)
+ if (_operationState.UnflushedBytes == 0)
{
// Nothing written to commit
return true;
@@ -287,14 +294,14 @@ internal bool CommitUnsynchronized()
// Update the writing head
Debug.Assert(_writingHead != null);
- _writingHead.End += _writingHeadBytesBuffered;
+ _writingHead.End += _operationState.WritingHeadBytesBuffered;
// Always move the read tail to the write head
_readTail = _writingHead;
_readTailIndex = _writingHead.End;
long oldLength = _unconsumedBytes;
- _unconsumedBytes += _unflushedBytes;
+ _unconsumedBytes += _operationState.UnflushedBytes;
// Do not reset if reader is complete
if (_pauseWriterThreshold > 0 &&
@@ -305,31 +312,33 @@ internal bool CommitUnsynchronized()
_writerAwaitable.SetUncompleted();
}
- _unflushedBytes = 0;
- _writingHeadBytesBuffered = 0;
+ _operationState.UnflushedBytes = 0;
+ _operationState.WritingHeadBytesBuffered = 0;
return false;
}
internal void Advance(int bytes)
{
- lock (_sync)
+ if ((uint)bytes > (uint)_operationState.WritingHeadMemory.Length)
{
- if ((uint)bytes > (uint)_writingHeadMemory.Length)
- {
- ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
- }
-
- AdvanceCore(bytes);
+ ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
}
+
+ AdvanceCore(bytes);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void AdvanceCore(int bytesWritten)
{
- _unflushedBytes += bytesWritten;
- _writingHeadBytesBuffered += bytesWritten;
- _writingHeadMemory = _writingHeadMemory.Slice(bytesWritten);
+ if (!_operationState.IsWritingActive)
+ {
+ ThrowHelper.ThrowInvalidOperationException_NoWriteToAdvance();
+ }
+
+ _operationState.UnflushedBytes += bytesWritten;
+ _operationState.WritingHeadBytesBuffered += bytesWritten;
+ _operationState.WritingHeadMemory = _operationState.WritingHeadMemory.Slice(bytesWritten);
}
internal ValueTask FlushAsync(CancellationToken cancellationToken)
@@ -338,7 +347,7 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken)
ValueTask result;
lock (_sync)
{
- PrepareFlush(out completionData, out result, cancellationToken);
+ PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}
TrySchedule(_readerScheduler, completionData);
@@ -346,9 +355,9 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken)
return result;
}
- private void PrepareFlush(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken)
+ private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken)
{
- var wasEmpty = CommitUnsynchronized();
+ bool wasEmpty = CommitUnsynchronized();
// AttachToken before completing reader awaiter in case cancellationToken is already completed
_writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
@@ -438,14 +447,12 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
}
- BufferSegment? returnStart = null;
- BufferSegment? returnEnd = null;
CompletionData completionData = default;
lock (_sync)
{
- var examinedEverything = false;
+ bool examinedEverything = false;
if (examinedSegment == _readTail)
{
examinedEverything = examinedIndex == _readTailIndex;
@@ -475,6 +482,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
}
}
+ BufferSegment? returnStart = null;
+ BufferSegment? returnEnd = null;
if (consumedSegment != null)
{
if (_readHead == null)
@@ -511,11 +520,11 @@ void MoveReturnEndToNextBlock()
}
// If the writing head is the same as the block to be returned, then we need to make sure
// there's no pending write and that there's no buffered data for the writing head
- else if (_writingHeadBytesBuffered == 0 && !_operationState.IsWritingActive)
+ else if (_operationState.WritingHeadBytesBuffered == 0 && !_operationState.IsWritingActive)
{
// Reset the writing head to null if it's the return block and we've consumed everything
_writingHead = null;
- _writingHeadMemory = default;
+ _operationState.WritingHeadMemory = default;
MoveReturnEndToNextBlock();
}
@@ -655,7 +664,6 @@ internal ValueTask ReadAsync(CancellationToken token)
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
- ValueTask result;
lock (_sync)
{
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
@@ -664,16 +672,12 @@ internal ValueTask ReadAsync(CancellationToken token)
if (_readerAwaitable.IsCompleted)
{
GetReadResult(out ReadResult readResult);
- result = new ValueTask(readResult);
- }
- else
- {
- // Otherwise it's async
- result = new ValueTask(_reader, token: 0);
+ return new ValueTask(readResult);
}
}
- return result;
+ // Otherwise it's async
+ return new ValueTask(_reader, token: 0);
}
internal bool TryRead(out ReadResult result)
@@ -960,9 +964,9 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella
// state as writing
AllocateWriteHeadIfNeeded(0);
- if (source.Length <= _writingHeadMemory.Length)
+ if (source.Length <= _operationState.WritingHeadMemory.Length)
{
- source.CopyTo(_writingHeadMemory);
+ source.CopyTo(_operationState.WritingHeadMemory);
AdvanceCore(source.Length);
}
@@ -972,7 +976,7 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella
WriteMultiSegment(source.Span);
}
- PrepareFlush(out completionData, out result, cancellationToken);
+ PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}
TrySchedule(_readerScheduler, completionData);
@@ -982,7 +986,7 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella
private void WriteMultiSegment(ReadOnlySpan source)
{
Debug.Assert(_writingHead != null);
- Span destination = _writingHeadMemory.Span;
+ Span destination = _operationState.WritingHeadMemory.Span;
while (true)
{
@@ -998,7 +1002,7 @@ private void WriteMultiSegment(ReadOnlySpan source)
// We filled the segment
_writingHead.End += writable;
- _writingHeadBytesBuffered = 0;
+ _operationState.WritingHeadBytesBuffered = 0;
// This is optimized to use pooled memory. That's why we pass 0 instead of
// source.Length
@@ -1007,7 +1011,7 @@ private void WriteMultiSegment(ReadOnlySpan source)
_writingHead.SetNext(newSegment);
_writingHead = newSegment;
- destination = _writingHeadMemory.Span;
+ destination = _operationState.WritingHeadMemory.Span;
}
}
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs
index 1271df5a9a11c8..df24544d53d51c 100644
--- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs
+++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs
@@ -4,70 +4,127 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
namespace System.IO.Pipelines
{
- [DebuggerDisplay("State: {_state}")]
+ [DebuggerDisplay("State: {State}")]
+ [StructLayout(LayoutKind.Explicit)]
internal struct PipeOperationState
{
- private State _state;
+ // Ensure reader and writer data not on same cache line
+ [FieldOffset(0)]
+ private WriterData _writerData;
+ [FieldOffset(128)]
+ private ReaderData _readerData;
+
+ public Memory WritingHeadMemory
+ {
+ get => _writerData._writingHeadMemory;
+ set => _writerData._writingHeadMemory = value;
+ }
+
+ public int WritingHeadBytesBuffered
+ {
+ get => _writerData._writingHeadBytesBuffered;
+ set => _writerData._writingHeadBytesBuffered = value;
+ }
+
+ public long UnflushedBytes
+ {
+ get => _writerData._unflushedBytes;
+ set => _writerData._unflushedBytes = value;
+ }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginRead()
{
- if ((_state & State.Reading) == State.Reading)
+ if ((_readerData._readState & ReadState.Reading) != 0)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}
- _state |= State.Reading;
+ _readerData._readState |= ReadState.Reading;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginReadTentative()
{
- if ((_state & State.Reading) == State.Reading)
+ if ((_readerData._readState & ReadState.Reading) != 0)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}
- _state |= State.ReadingTentative;
+ _readerData._readState |= ReadState.ReadingTentative;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndRead()
{
- if ((_state & State.Reading) != State.Reading &&
- (_state & State.ReadingTentative) != State.ReadingTentative)
+ if (_readerData._readState == ReadState.Inactive)
{
ThrowHelper.ThrowInvalidOperationException_NoReadToComplete();
}
- _state &= ~(State.Reading | State.ReadingTentative);
+ _readerData._readState = ReadState.Inactive;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginWrite()
{
- _state |= State.Writing;
+ _writerData._writeState = WriteState.Writing;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndWrite()
{
- _state &= ~State.Writing;
+ _writerData._writeState = WriteState.Inactive;
+ }
+
+ public bool IsWritingActive
+ {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ get => (_writerData._writeState & WriteState.Writing) != 0;
}
- public bool IsWritingActive => (_state & State.Writing) == State.Writing;
+ public bool IsReadingActive
+ {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ get => (_readerData._readState & ReadState.Reading) != 0;
+ }
+
+ private string State => $"WriteState: {_writerData._writeState}; ReadState: {_readerData._readState}";
+
+ [StructLayout(LayoutKind.Auto)]
+ private struct ReaderData
+ {
+ public volatile ReadState _readState;
+ }
- public bool IsReadingActive => (_state & State.Reading) == State.Reading;
+ [StructLayout(LayoutKind.Auto)]
+ private struct WriterData
+ {
+ public volatile WriteState _writeState;
+ // The write head which is the extent of the PipeWriter's written bytes
+ public Memory _writingHeadMemory;
+ public int _writingHeadBytesBuffered;
+ // The number of bytes written but not flushed
+ public long _unflushedBytes;
+ }
[Flags]
- internal enum State : byte
+ internal enum ReadState : int
{
+ Inactive = 0,
Reading = 1,
- ReadingTentative = 2,
- Writing = 4
+ ReadingTentative = 2
+ }
+
+ [Flags]
+ internal enum WriteState : int
+ {
+ Inactive = 0,
+ Writing = 1
}
}
}
diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs
index 6a124e50f9ee5e..6fb5936dafdf45 100644
--- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs
+++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs
@@ -84,6 +84,11 @@ internal static class ThrowHelper
public static void ThrowInvalidOperationException_InvalidZeroByteRead() => throw CreateInvalidOperationException_InvalidZeroByteRead();
[MethodImpl(MethodImplOptions.NoInlining)]
public static Exception CreateInvalidOperationException_InvalidZeroByteRead() => new InvalidOperationException(SR.InvalidZeroByteRead);
+
+ [DoesNotReturn]
+ public static void ThrowInvalidOperationException_NoWriteToAdvance() => throw CreateInvalidOperationException_NoWriteToAdvance();
+ [MethodImpl(MethodImplOptions.NoInlining)]
+ public static Exception CreateInvalidOperationException_NoWriteToAdvance() => new InvalidOperationException(SR.NoWritingOperationToAdvance);
}
internal enum ExceptionArgument