Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 30 additions & 22 deletions src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,35 @@ private void AllocateWriteHeadIfNeeded(int sizeHint)
if (!_operationState.IsWritingActive ||
_writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint)
{
AllocateWriteHeadSynchronized(sizeHint);
// Set wrtiting before checking anything; this is important as AdvanceReader
// can release the write head to free up memory if it considers the Pipe to be idle.
_operationState.BeginWrite();

// Ensure a read is not in progress as it may have released the write head, but not yet returned
// the segments in which case we cannot access them without a lock.
if (!_operationState.IsReadingActive && _writingHead == null)
{
// No reader in progress and no existing data; so we don't need the lock to allocate and initalize.
BufferSegment newSegment = AllocateSegment(sizeHint);
// Set all the pointers
_writingHead = _readHead = _readTail = newSegment;
_lastExaminedIndex = 0;
}
else
{
AllocateWriteHeadSynchronized(sizeHint);
}
}
}

private void AllocateWriteHeadSynchronized(int sizeHint)
{
lock (_sync)
{
_operationState.BeginWrite();

if (_writingHead == null)
{
// We need to allocate memory to write since nobody has written before
// No existing write head so we need to initialize everything.
BufferSegment newSegment = AllocateSegment(sizeHint);

// Set all the pointers
_writingHead = _readHead = _readTail = newSegment;
_lastExaminedIndex = 0;
Expand Down Expand Up @@ -313,15 +327,14 @@ internal bool CommitUnsynchronized()

internal void Advance(int bytes)
{
lock (_sync)
// Advance is not under lock, so Write must be active or Read could release
// the write head to free up idle memory before we have increased the unflushed data.
if (!_operationState.IsWritingActive || (uint)bytes > (uint)_writingHeadMemory.Length)
{
if ((uint)bytes > (uint)_writingHeadMemory.Length)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
}

AdvanceCore(bytes);
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
}

AdvanceCore(bytes);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -438,8 +451,6 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
}

BufferSegment? returnStart = null;
BufferSegment? returnEnd = null;

CompletionData completionData = default;

Expand Down Expand Up @@ -475,6 +486,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
}
}

BufferSegment? returnStart = null;
BufferSegment? returnEnd = null;
if (consumedSegment != null)
{
if (_readHead == null)
Expand Down Expand Up @@ -655,7 +668,6 @@ internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}

ValueTask<ReadResult> result;
lock (_sync)
{
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
Expand All @@ -664,16 +676,12 @@ internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
if (_readerAwaitable.IsCompleted)
{
GetReadResult(out ReadResult readResult);
result = new ValueTask<ReadResult>(readResult);
}
else
{
// Otherwise it's async
result = new ValueTask<ReadResult>(_reader, token: 0);
return new ValueTask<ReadResult>(readResult);
}
}

return result;
// Otherwise it's async
return new ValueTask<ReadResult>(_reader, token: 0);
}

internal bool TryRead(out ReadResult result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,70 +4,104 @@

using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

namespace System.IO.Pipelines
{
[DebuggerDisplay("State: {_state}")]
[DebuggerDisplay("State: {State}")]
[StructLayout(LayoutKind.Explicit)]
internal struct PipeOperationState
{
private State _state;
// Ensure reader and writer data not on same cache line
[FieldOffset(0)]
private WriterData _writerData;
[FieldOffset(128)]
private ReaderData _readerData;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginRead()
{
if ((_state & State.Reading) == State.Reading)
if ((_readerData._readState & ReadState.Reading) != 0)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}

_state |= State.Reading;
_readerData._readState = ReadState.Reading;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginReadTentative()
{
if ((_state & State.Reading) == State.Reading)
if ((_readerData._readState & ReadState.Reading) != 0)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}

_state |= State.ReadingTentative;
_readerData._readState = ReadState.ReadingTentative;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndRead()
{
if ((_state & State.Reading) != State.Reading &&
(_state & State.ReadingTentative) != State.ReadingTentative)
if (_readerData._readState == ReadState.Inactive)
{
ThrowHelper.ThrowInvalidOperationException_NoReadToComplete();
}

_state &= ~(State.Reading | State.ReadingTentative);
_readerData._readState = ReadState.Inactive;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginWrite()
{
_state |= State.Writing;
_writerData._writeState = WriteState.Writing;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndWrite()
{
_state &= ~State.Writing;
_writerData._writeState = WriteState.Inactive;
}

public bool IsWritingActive => (_state & State.Writing) == State.Writing;
public bool IsWritingActive
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => (_writerData._writeState & WriteState.Writing) != 0;
}

public bool IsReadingActive
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => (_readerData._readState & (ReadState.Reading | ReadState.ReadingTentative)) != 0;
}

private string State => $"WriteState: {_writerData._writeState}; ReadState: {_readerData._readState}";

public bool IsReadingActive => (_state & State.Reading) == State.Reading;
[StructLayout(LayoutKind.Auto)]
private struct ReaderData
{
public volatile ReadState _readState;
}

[StructLayout(LayoutKind.Auto)]
private struct WriterData
{
public volatile WriteState _writeState;
}

[Flags]
internal enum State : byte
internal enum ReadState : int
{
Inactive = 0,
Reading = 1,
ReadingTentative = 2,
Writing = 4
ReadingTentative = 2
}

[Flags]
internal enum WriteState : int
{
Inactive = 0,
Writing = 1
}
}
}
8 changes: 8 additions & 0 deletions src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ public void ThrowsOnAdvanceOverMemorySize()

[Fact]
public void ThrowsOnAdvanceWithNoMemory()
{
Memory<byte> buffer = Pipe.Writer.GetMemory(1);
Pipe.Writer.Advance(buffer.Length);
Assert.Throws<ArgumentOutOfRangeException>(() => Pipe.Writer.Advance(1));
}

[Fact]
public void ThrowsOnAdvanceWithoutWrite()
{
PipeWriter buffer = Pipe.Writer;
Assert.Throws<ArgumentOutOfRangeException>(() => buffer.Advance(1));
Expand Down