diff --git a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs index 041fa8751925d2..a8d01238b17e83 100644 --- a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs +++ b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs @@ -74,6 +74,7 @@ protected PipeWriter() { } protected internal virtual System.Threading.Tasks.Task CopyFromAsync(System.IO.Stream source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public static System.IO.Pipelines.PipeWriter Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeWriterOptions? writerOptions = null) { throw null; } public abstract System.Threading.Tasks.ValueTask FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); + public virtual System.Threading.Tasks.ValueTask FlushAsync(bool isMoreData, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public abstract System.Memory GetMemory(int sizeHint = 0); public abstract System.Span GetSpan(int sizeHint = 0); [System.ObsoleteAttribute("OnReaderCompleted may not be invoked on all implementations of PipeWriter. This will be removed in a future release.")] diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs index 800b1414260838..0a6bd77a629ecc 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegment.cs @@ -60,6 +60,31 @@ public void SetOwnedMemory(byte[] arrayPoolBuffer) AvailableMemory = arrayPoolBuffer; } + public IMemoryOwner? GetMemoryBlockAndResetSegment() + { + IMemoryOwner? memoryOwner = _memoryOwner; + if (memoryOwner != null) + { + _memoryOwner = null; + // We return the owner rather than disposing + } + else + { + Debug.Assert(_array != null); + ArrayPool.Shared.Return(_array); + _array = null; + } + + Next = null; + RunningIndex = 0; + Memory = default; + _next = null; + _end = 0; + AvailableMemory = default; + + return memoryOwner; + } + public void ResetMemory() { IMemoryOwner? memoryOwner = _memoryOwner; diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/FlushResult.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/FlushResult.cs index ccd04d7e98e9ff..80fd0043db8bfd 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/FlushResult.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/FlushResult.cs @@ -2,10 +2,12 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Threading; + namespace System.IO.Pipelines { /// - /// Result returned by call + /// Result returned by call /// public struct FlushResult { @@ -30,7 +32,7 @@ public FlushResult(bool isCanceled, bool isCompleted) } /// - /// True if the current operation was canceled, otherwise false. + /// True if the current operation was canceled, otherwise false. /// public bool IsCanceled => (_resultFlags & ResultFlags.Canceled) != 0; diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeWriter.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeWriter.cs index 8e5cf95d9752dd..6276ad77dbb0b1 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeWriter.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeWriter.cs @@ -30,7 +30,9 @@ public DefaultPipeWriter(Pipe pipe) public override void OnReaderCompleted(Action callback, object? state) => _pipe.OnReaderCompleted(callback, state); #pragma warning restore CS0672 // Member overrides obsolete member - public override ValueTask FlushAsync(CancellationToken cancellationToken = default) => _pipe.FlushAsync(cancellationToken); + public override ValueTask FlushAsync(CancellationToken cancellationToken = default) => _pipe.FlushAsync(isMoreData: false, cancellationToken); + + public override ValueTask FlushAsync(bool isMoreData, CancellationToken cancellationToken = default) => _pipe.FlushAsync(isMoreData, cancellationToken); public override void Advance(int bytes) => _pipe.Advance(bytes); diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index b92832c053dad5..787426f9805dda 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -176,12 +176,68 @@ internal Span GetSpan(int sizeHint) [MethodImpl(MethodImplOptions.AggressiveInlining)] private void AllocateWriteHeadIfNeeded(int sizeHint) { - // If writing is currently active and enough space, don't need to take the lock to just set WritingActive. - // IsWritingActive is needed to prevent the reader releasing the writers memory when it fully consumes currently written. - if (!_operationState.IsWritingActive || - _writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint) + // The very first operations we do are set IsWriting and check IsReading + // as they are our guards and we may be in a data race to avoid taking a lock. + + // Set IsWrtiting before checking anything; this is important as AdvanceReader + // can release the write head to free up memory if it considers the Pipe to be idle. + if (_operationState.SetWritingIfNotWriting()) + { + // Writing wasn't active; we need to be careful. + if (!_operationState.IsReadingActive) + { + int bytesLeftInBuffer = _writingHeadMemory.Length; + if (bytesLeftInBuffer > 0 && bytesLeftInBuffer >= sizeHint) + { + // Reading wasn't active and we already have enough memory, + // so we can return directly. + return; + } + + if (_writingHead == null) + { + // If the Reader wasn't active after setting Writing active there will + // be no Read head so we are fully uncontended on BufferSegments, but + // we need to set everything up. + Debug.Assert(_readHead == null && _readTail == null, "Read heads can't be allocated at this point"); + + BufferSegment newSegment = AllocateSegment(sizeHint); + // Set all the pointers + _writingHead = _readHead = _readTail = newSegment; + _lastExaminedIndex = 0; + } + else + { + // Not enough room in current buffer; we need to rent a new BufferSegment, + // since the pipe is active we need a lock for that. + // (BufferSegment pool is shared and not guarded by the IsWriting/IsReading flags). + AllocateWriteHeadSynchronized(sizeHint); + } + } + else + { + // Writing wasn't active and overlapped with Read activity; we need to take a lock, + // to protect the Read/Write heads. + AllocateWriteHeadSynchronized(sizeHint); + } + } + else { - AllocateWriteHeadSynchronized(sizeHint); + Debug.Assert(_writingHead != null, "Writing can't be active without a Write head"); + // Writing was already active so we are safe; except if we need a new BufferSegment. + int bytesLeftInBuffer = _writingHeadMemory.Length; + if (bytesLeftInBuffer > 0 && bytesLeftInBuffer >= sizeHint) + { + // We already have enough memory, so we can return directly. + return; + } + else + { + // Not enough room in current buffer; we need to rent a new BufferSegment, + // since the pipe was active we need a lock for that. + // (BufferSegment pool is shared and not guarded by the IsWriting/IsReading flags). + AllocateWriteHeadSynchronized(sizeHint); + } } } @@ -189,13 +245,10 @@ private void AllocateWriteHeadSynchronized(int sizeHint) { lock (_sync) { - _operationState.BeginWrite(); - if (_writingHead == null) { - // We need to allocate memory to write since nobody has written before + // No existing write head so we need to initialize everything. BufferSegment newSegment = AllocateSegment(sizeHint); - // Set all the pointers _writingHead = _readHead = _readTail = newSegment; _lastExaminedIndex = 0; @@ -275,9 +328,13 @@ private void ReturnSegmentUnsynchronized(BufferSegment segment) } } - internal bool CommitUnsynchronized() + internal bool CommitUnsynchronized(bool isMoreData) { - _operationState.EndWrite(); + // If there is more data, don't end the Write + if (!isMoreData) + { + _operationState.EndWrite(); + } if (_unflushedBytes == 0) { @@ -313,15 +370,15 @@ internal bool CommitUnsynchronized() internal void Advance(int bytes) { - lock (_sync) + // Advance is not under lock, so Write must be active or Read could release + // the write head to free up idle memory before we have increased the unflushed data. + // We must have set IsWritingActive prior to adjusting the Write segments as it is a guard. + if (!_operationState.IsWritingActive || (uint)bytes > (uint)_writingHeadMemory.Length) { - if ((uint)bytes > (uint)_writingHeadMemory.Length) - { - ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes); - } - - AdvanceCore(bytes); + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes); } + + AdvanceCore(bytes); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -332,13 +389,13 @@ private void AdvanceCore(int bytesWritten) _writingHeadMemory = _writingHeadMemory.Slice(bytesWritten); } - internal ValueTask FlushAsync(CancellationToken cancellationToken) + internal ValueTask FlushAsync(bool isMoreData, CancellationToken cancellationToken) { CompletionData completionData; ValueTask result; lock (_sync) { - PrepareFlush(out completionData, out result, cancellationToken); + PrepareFlush(isMoreData, out completionData, out result, cancellationToken); } TrySchedule(_readerScheduler, completionData); @@ -346,9 +403,9 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) return result; } - private void PrepareFlush(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken) + private void PrepareFlush(bool isMoreData, out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken) { - var wasEmpty = CommitUnsynchronized(); + var wasEmpty = CommitUnsynchronized(isMoreData); // AttachToken before completing reader awaiter in case cancellationToken is already completed _writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this); @@ -392,7 +449,7 @@ internal void CompleteWriter(Exception? exception) lock (_sync) { // Commit any pending buffers - CommitUnsynchronized(); + CommitUnsynchronized(isMoreData: false); completionCallbacks = _writerCompletion.TryComplete(exception); _readerAwaitable.Complete(out completionData); @@ -438,11 +495,10 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition(); } - BufferSegment? returnStart = null; - BufferSegment? returnEnd = null; - CompletionData completionData = default; + IMemoryOwner? blockToReturn = null; + bool success; lock (_sync) { var examinedEverything = false; @@ -475,6 +531,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu } } + BufferSegment? returnStart = null; + BufferSegment? returnEnd = null; if (consumedSegment != null) { if (_readHead == null) @@ -511,6 +569,7 @@ void MoveReturnEndToNextBlock() } // If the writing head is the same as the block to be returned, then we need to make sure // there's no pending write and that there's no buffered data for the writing head + // We must check IsWritingActive prior to touching the writing segments as it is a guard. else if (_writingHeadBytesBuffered == 0 && !_operationState.IsWritingActive) { // Reset the writing head to null if it's the return block and we've consumed everything @@ -541,18 +600,48 @@ void MoveReturnEndToNextBlock() _readerAwaitable.SetUncompleted(); } - while (returnStart != null && returnStart != returnEnd) + if (returnStart != null && returnStart != returnEnd) { BufferSegment? next = returnStart.NextSegment; - returnStart.ResetMemory(); - ReturnSegmentUnsynchronized(returnStart); - returnStart = next; + if (next == null || next == returnEnd) + { + // Fast-path, single block to return; we will return the block outside of the lock. + blockToReturn = returnStart.GetMemoryBlockAndResetSegment(); + ReturnSegmentUnsynchronized(returnStart); + } + else + { + // Multiple blocks to return; we will do it inside of lock. + returnStart.ResetMemory(); + ReturnSegmentUnsynchronized(returnStart); + returnStart = next; + do + { + next = returnStart.NextSegment; + returnStart.ResetMemory(); + ReturnSegmentUnsynchronized(returnStart); + returnStart = next; + } while (returnStart != null && returnStart != returnEnd) ; + } } - _operationState.EndRead(); + success = _operationState.TryEndRead(); } - TrySchedule(_writerScheduler, completionData); + // Return the block before throwing the exception if there is one. + if (blockToReturn != null) + { + blockToReturn.Dispose(); + } + + if (success) + { + TrySchedule(_writerScheduler, completionData); + } + else + { + ThrowHelper.ThrowInvalidOperationException_NoReadToComplete(); + } } internal void CompleteReader(Exception? exception) @@ -655,7 +744,6 @@ internal ValueTask ReadAsync(CancellationToken token) ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } - ValueTask result; lock (_sync) { _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); @@ -664,16 +752,12 @@ internal ValueTask ReadAsync(CancellationToken token) if (_readerAwaitable.IsCompleted) { GetReadResult(out ReadResult readResult); - result = new ValueTask(readResult); - } - else - { - // Otherwise it's async - result = new ValueTask(_reader, token: 0); + return new ValueTask(readResult); } } - return result; + // Otherwise it's async + return new ValueTask(_reader, token: 0); } internal bool TryRead(out ReadResult result) @@ -946,16 +1030,16 @@ private void GetFlushResult(ref FlushResult result) internal ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken) { - if (_writerCompletion.IsCompleted) - { - ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); - } - CompletionData completionData; ValueTask result; lock (_sync) { + if (_writerCompletion.IsCompleted) + { + ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); + } + // Allocate whatever the pool gives us so we can write, this also marks the // state as writing AllocateWriteHeadIfNeeded(0); @@ -972,7 +1056,7 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella WriteMultiSegment(source.Span); } - PrepareFlush(out completionData, out result, cancellationToken); + PrepareFlush(isMoreData: false, out completionData, out result, cancellationToken); } TrySchedule(_readerScheduler, completionData); diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs index 1271df5a9a11c8..02ebb3b662d927 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs @@ -4,70 +4,122 @@ using System.Diagnostics; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; namespace System.IO.Pipelines { - [DebuggerDisplay("State: {_state}")] + [DebuggerDisplay("State: {State}")] + [StructLayout(LayoutKind.Explicit)] internal struct PipeOperationState { - private State _state; + // Ensure reader and writer data not on same cache line + [FieldOffset(0)] + private WriterData _writerData; + [FieldOffset(128)] + private ReaderData _readerData; [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginRead() { - if ((_state & State.Reading) == State.Reading) + if ((_readerData._readState & ReadState.Reading) != 0) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } - _state |= State.Reading; + _readerData._readState = ReadState.Reading; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginReadTentative() { - if ((_state & State.Reading) == State.Reading) + if ((_readerData._readState & ReadState.Reading) != 0) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } - _state |= State.ReadingTentative; + _readerData._readState = ReadState.ReadingTentative; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndRead() { - if ((_state & State.Reading) != State.Reading && - (_state & State.ReadingTentative) != State.ReadingTentative) + if (_readerData._readState == ReadState.Inactive) { ThrowHelper.ThrowInvalidOperationException_NoReadToComplete(); } - _state &= ~(State.Reading | State.ReadingTentative); + _readerData._readState = ReadState.Inactive; } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void BeginWrite() + public bool TryEndRead() { - _state |= State.Writing; + if (_readerData._readState == ReadState.Inactive) + { + return false; + } + + _readerData._readState = ReadState.Inactive; + return true; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public bool SetWritingIfNotWriting() + { + if (_writerData._writeState != WriteState.Writing) + { + _writerData._writeState = WriteState.Writing; + return true; + } + + return false; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndWrite() { - _state &= ~State.Writing; + _writerData._writeState = WriteState.Inactive; } - public bool IsWritingActive => (_state & State.Writing) == State.Writing; + public bool IsWritingActive + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => (_writerData._writeState & WriteState.Writing) != 0; + } - public bool IsReadingActive => (_state & State.Reading) == State.Reading; + public bool IsReadingActive + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => (_readerData._readState & (ReadState.Reading | ReadState.ReadingTentative)) != 0; + } + + private string State => $"WriteState: {_writerData._writeState}; ReadState: {_readerData._readState}"; + + [StructLayout(LayoutKind.Auto)] + private struct ReaderData + { + public volatile ReadState _readState; + } + + [StructLayout(LayoutKind.Auto)] + private struct WriterData + { + public volatile WriteState _writeState; + } [Flags] - internal enum State : byte + internal enum ReadState : int { + Inactive = 0, Reading = 1, - ReadingTentative = 2, - Writing = 4 + ReadingTentative = 2 + } + + [Flags] + internal enum WriteState : int + { + Inactive = 0, + Writing = 1 } } } diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs index b6d5cdca83c7f0..8a96dd5ded8132 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOptions.cs @@ -69,12 +69,12 @@ public PipeOptions( public bool UseSynchronizationContext { get; } /// - /// Gets amount of bytes in when starts blocking + /// Gets amount of bytes in when starts blocking /// public long PauseWriterThreshold { get; } /// - /// Gets amount of bytes in when stops blocking + /// Gets amount of bytes in when stops blocking /// public long ResumeWriterThreshold { get; } diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs index 427bd2ee111f2f..71502c72b5214f 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs @@ -39,7 +39,7 @@ public virtual ValueTask CompleteAsync(Exception? exception = null) } /// - /// Cancel the pending operation. If there is none, cancels next operation, without completing the . + /// Cancel the pending operation. If there is none, cancels next operation, without completing the . /// public abstract void CancelPendingFlush(); @@ -57,6 +57,14 @@ public virtual void OnReaderCompleted(Action callback, obje /// public abstract ValueTask FlushAsync(CancellationToken cancellationToken = default); + /// + /// Makes bytes written available to and runs continuation. + /// + /// indicating that more data is to be written imminently. + /// The token to monitor for cancellation requests. The default value is . + public virtual ValueTask FlushAsync(bool isMoreData, CancellationToken cancellationToken = default) + => FlushAsync(cancellationToken); + /// public abstract void Advance(int bytes); diff --git a/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs index 128ce4c3bb4c2b..f398f80b2ccc8c 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs @@ -178,6 +178,14 @@ public void ThrowsOnAdvanceOverMemorySize() [Fact] public void ThrowsOnAdvanceWithNoMemory() + { + Memory buffer = Pipe.Writer.GetMemory(1); + Pipe.Writer.Advance(buffer.Length); + Assert.Throws(() => Pipe.Writer.Advance(1)); + } + + [Fact] + public void ThrowsOnAdvanceWithoutWrite() { PipeWriter buffer = Pipe.Writer; Assert.Throws(() => buffer.Advance(1));