Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ public void SetOwnedMemory(byte[] arrayPoolBuffer)
AvailableMemory = arrayPoolBuffer;
}

public IMemoryOwner<byte>? GetMemoryBlockAndResetSegment()
{
IMemoryOwner<byte>? memoryOwner = _memoryOwner;
if (memoryOwner != null)
{
_memoryOwner = null;
// We return the owner rather than disposing
}
else
{
Debug.Assert(_array != null);
ArrayPool<byte>.Shared.Return(_array);
_array = null;
}

Next = null;
RunningIndex = 0;
Memory = default;
_next = null;
_end = 0;
AvailableMemory = default;

return memoryOwner;
}

public void ResetMemory()
{
IMemoryOwner<byte>? memoryOwner = _memoryOwner;
Expand Down
50 changes: 41 additions & 9 deletions src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -438,11 +438,10 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
}

BufferSegment? returnStart = null;
BufferSegment? returnEnd = null;

CompletionData completionData = default;

IMemoryOwner<byte>? blockToReturn = null;
bool success;
lock (_sync)
{
var examinedEverything = false;
Expand Down Expand Up @@ -475,6 +474,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu
}
}

BufferSegment? returnStart = null;
BufferSegment? returnEnd = null;
if (consumedSegment != null)
{
if (_readHead == null)
Expand Down Expand Up @@ -541,18 +542,49 @@ void MoveReturnEndToNextBlock()
_readerAwaitable.SetUncompleted();
}

while (returnStart != null && returnStart != returnEnd)
if (returnStart != null && returnStart != returnEnd)
{
BufferSegment? next = returnStart.NextSegment;
returnStart.ResetMemory();
ReturnSegmentUnsynchronized(returnStart);
returnStart = next;
if (next == null || next == returnEnd)
{
// Fast-path, single block to return; we will return the block outside of the lock.
blockToReturn = returnStart.GetMemoryBlockAndResetSegment();
ReturnSegmentUnsynchronized(returnStart);
}
else
{
// Multiple blocks to return; we will do it inside of lock.
returnStart.ResetMemory();
ReturnSegmentUnsynchronized(returnStart);
returnStart = next;
do
{
next = returnStart.NextSegment;
returnStart.ResetMemory();
ReturnSegmentUnsynchronized(returnStart);
returnStart = next;
} while (returnStart != null && returnStart != returnEnd) ;
}
}

_operationState.EndRead();
success = _operationState.TryEndRead();
}

TrySchedule(_writerScheduler, completionData);
if (success)
{
TrySchedule(_writerScheduler, completionData);
}

// Return the block before throwing the exception if there is one.
if (blockToReturn != null)
{
blockToReturn.Dispose();
}

if (!success)
{
ThrowHelper.ThrowInvalidOperationException_NoReadToComplete();
}
}

internal void CompleteReader(Exception? exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ public void EndRead()
_state &= ~(State.Reading | State.ReadingTentative);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryEndRead()
{
if ((_state & State.Reading) != State.Reading &&
(_state & State.ReadingTentative) != State.ReadingTentative)
{
return false;
}

_state &= ~(State.Reading | State.ReadingTentative);
return true;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginWrite()
{
Expand Down