From 6e3a82b8c7a46e4893bec838ce689a495c87e624 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sat, 23 May 2020 23:21:48 +0100 Subject: [PATCH 1/4] Reduce Lock contention in Pipelines --- .../System/IO/Pipelines/BufferSegmentStack.cs | 2 - .../src/System/IO/Pipelines/Pipe.cs | 165 ++++++++++++------ .../System/IO/Pipelines/PipeOperationState.cs | 55 ++++-- .../System/IO/Pipelines/StreamPipeReader.cs | 8 +- .../System/IO/Pipelines/StreamPipeWriter.cs | 10 +- .../tests/BufferSegmentPoolTest.cs | 12 +- 6 files changed, 170 insertions(+), 82 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs index 51b85733017382..472baf1f8436fb 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs @@ -3,9 +3,7 @@ // See the LICENSE file in the project root for more information. using System; -using System.Collections.Generic; using System.Runtime.CompilerServices; -using System.Text; using System.Diagnostics.CodeAnalysis; namespace System.IO.Pipelines diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index b92832c053dad5..5c090cae0dc8e7 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. using System.Buffers; -using System.Collections.Generic; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; @@ -43,6 +42,7 @@ public sealed partial class Pipe // Mutable struct! Don't make this readonly private BufferSegmentStack _bufferSegmentPool; + private BufferSegment? _pooledBufferSegment; private readonly DefaultPipeReader _reader; private readonly DefaultPipeWriter _writer; @@ -72,6 +72,9 @@ public sealed partial class Pipe private readonly int _maxPooledBufferSize; private bool _disposed; + private static readonly Exception s_exceptionSentinel = new Exception(); + private volatile Exception? _writerCompletionException; + // The extent of the bytes available to the PipeReader to consume private BufferSegment? _readTail; private int _readTailIndex; @@ -132,6 +135,7 @@ private void ResetState() _writerCompletion.Reset(); _readerAwaitable = new PipeAwaitable(completed: false, _useSynchronizationContext); _writerAwaitable = new PipeAwaitable(completed: true, _useSynchronizationContext); + _writerCompletionException = null; _readTailIndex = 0; _readHeadIndex = 0; _lastExaminedIndex = -1; @@ -153,6 +157,13 @@ internal Memory GetMemory(int sizeHint) AllocateWriteHeadIfNeeded(sizeHint); + _operationState.EndWriteAllocation(); + if (_writerCompletionException != null) + { + CompleteWriter(); + ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); + } + return _writingHeadMemory; } @@ -170,6 +181,13 @@ internal Span GetSpan(int sizeHint) AllocateWriteHeadIfNeeded(sizeHint); + _operationState.EndWriteAllocation(); + if (_writerCompletionException != null) + { + CompleteWriter(); + ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); + } + return _writingHeadMemory.Span; } @@ -181,50 +199,55 @@ private void AllocateWriteHeadIfNeeded(int sizeHint) if (!_operationState.IsWritingActive || _writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint) { - AllocateWriteHeadSynchronized(sizeHint); + AllocateWriteHead(sizeHint); } } - private void AllocateWriteHeadSynchronized(int sizeHint) + private void AllocateWriteHead(int sizeHint) { - lock (_sync) + // First set the Write active and block reader from returning the writer's blocks + _operationState.BeginWrite(); + + if (_writingHead == null) { - _operationState.BeginWrite(); + Debug.Assert(_readHead == null, "Returning _readHead segment that's in use!"); + Debug.Assert(_readTail == null, "Returning _readTail segment that's in use!"); - if (_writingHead == null) - { - // We need to allocate memory to write since nobody has written before - BufferSegment newSegment = AllocateSegment(sizeHint); + // We need to allocate memory to write since nobody has written before + BufferSegment newSegment = AllocateSegment(sizeHint); - // Set all the pointers - _writingHead = _readHead = _readTail = newSegment; - _lastExaminedIndex = 0; - } - else - { - int bytesLeftInBuffer = _writingHeadMemory.Length; + // Set all the pointers + _writingHead = _readHead = _readTail = newSegment; + _lastExaminedIndex = 0; + } + else + { + int bytesLeftInBuffer = _writingHeadMemory.Length; - if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint) + if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint) + { + if (_writingHeadBytesBuffered > 0) { - if (_writingHeadBytesBuffered > 0) - { - // Flush buffered data to the segment - _writingHead.End += _writingHeadBytesBuffered; - _writingHeadBytesBuffered = 0; - } + // Flush buffered data to the segment + _writingHead.End += _writingHeadBytesBuffered; + _writingHeadBytesBuffered = 0; + } - BufferSegment newSegment = AllocateSegment(sizeHint); + BufferSegment newSegment = AllocateSegment(sizeHint); - _writingHead.SetNext(newSegment); - _writingHead = newSegment; - } + _writingHead.SetNext(newSegment); + _writingHead = newSegment; } } } private BufferSegment AllocateSegment(int sizeHint) { - BufferSegment newSegment = CreateSegmentUnsynchronized(); + BufferSegment? newSegment = Interlocked.Exchange(ref _pooledBufferSegment, null); + if (newSegment is null) + { + newSegment = CreateSegmentSynchronized(); + } int maxSize = _maxPooledBufferSize; if (_pool != null && sizeHint <= maxSize) @@ -253,11 +276,14 @@ private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) return adjustedToMaximumSize; } - private BufferSegment CreateSegmentUnsynchronized() + private BufferSegment CreateSegmentSynchronized() { - if (_bufferSegmentPool.TryPop(out BufferSegment? segment)) + lock (_sync) { - return segment; + if (_bufferSegmentPool.TryPop(out BufferSegment? segment)) + { + return segment; + } } return new BufferSegment(); @@ -385,6 +411,29 @@ private void PrepareFlush(out CompletionData completionData, out ValueTask WriteAsync(ReadOnlyMemory source, Cancella CompletionData completionData; ValueTask result; + // Allocate whatever the pool gives us so we can write, this also marks the + // state as writing + AllocateWriteHeadIfNeeded(0); + if (source.Length <= _writingHeadMemory.Length) + { + source.CopyTo(_writingHeadMemory); + + AdvanceCore(source.Length); + } + else + { + // This is the multi segment copy + WriteMultiSegment(source.Span); + } + + lock (_sync) { - // Allocate whatever the pool gives us so we can write, this also marks the - // state as writing - AllocateWriteHeadIfNeeded(0); + _operationState.EndWriteAllocation(); - if (source.Length <= _writingHeadMemory.Length) - { - source.CopyTo(_writingHeadMemory); + PrepareFlush(out completionData, out result, cancellationToken); - AdvanceCore(source.Length); - } - else + if (_writerCompletionException != null) { - // This is the multi segment copy - WriteMultiSegment(source.Span); + CompleteWriter(); } - - PrepareFlush(out completionData, out result, cancellationToken); } TrySchedule(_readerScheduler, completionData); + return result; } diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs index 1271df5a9a11c8..0b2518a99d4356 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs @@ -4,70 +4,93 @@ using System.Diagnostics; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; namespace System.IO.Pipelines { - [DebuggerDisplay("State: {_state}")] + [DebuggerDisplay("State: {State}")] + [StructLayout(LayoutKind.Explicit)] internal struct PipeOperationState { - private State _state; + [FieldOffset(0)] + private ReadState _readState; + [FieldOffset(64)] + private volatile WriteState _writeState; [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginRead() { - if ((_state & State.Reading) == State.Reading) + if ((_readState & ReadState.Reading) != 0) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } - _state |= State.Reading; + _readState |= ReadState.Reading; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginReadTentative() { - if ((_state & State.Reading) == State.Reading) + if ((_readState & ReadState.Reading) != 0) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } - _state |= State.ReadingTentative; + _readState |= ReadState.ReadingTentative; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndRead() { - if ((_state & State.Reading) != State.Reading && - (_state & State.ReadingTentative) != State.ReadingTentative) + if (_readState == ReadState.Inactive) { ThrowHelper.ThrowInvalidOperationException_NoReadToComplete(); } - _state &= ~(State.Reading | State.ReadingTentative); + _readState = ReadState.Inactive; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginWrite() { - _state |= State.Writing; + _writeState = (WriteState.Allocating | WriteState.Writing); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndWrite() { - _state &= ~State.Writing; + _writeState = WriteState.Inactive; } - public bool IsWritingActive => (_state & State.Writing) == State.Writing; + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public void EndWriteAllocation() + { + Debug.Assert(_writeState == (WriteState.Allocating | WriteState.Writing)); + _writeState = WriteState.Writing; + } + + public bool IsWritingActive => (_writeState & WriteState.Writing) != 0; + + public bool IsWritingAllocating => (_writeState & WriteState.Allocating) != 0; - public bool IsReadingActive => (_state & State.Reading) == State.Reading; + public bool IsReadingActive => (_readState & ReadState.Reading) != 0; + + private string State => $"WriteState: {_writeState}; ReadState: {_readState}"; [Flags] - internal enum State : byte + internal enum ReadState : int { + Inactive = 0, Reading = 1, - ReadingTentative = 2, - Writing = 4 + ReadingTentative = 2 + } + + [Flags] + internal enum WriteState : int + { + Inactive = 0, + Allocating = 1, + Writing = 2 } } } diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs index 4a4ff09622c3bd..9d8dabe4f0da46 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs @@ -151,7 +151,7 @@ private void AdvanceTo(BufferSegment? consumedSegment, int consumedIndex, Buffer { BufferSegment next = returnStart.NextSegment!; returnStart.ResetMemory(); - ReturnSegmentUnsynchronized(returnStart); + ReturnSegment(returnStart); returnStart = next; } } @@ -323,7 +323,7 @@ private void AllocateReadTail() private BufferSegment AllocateSegment() { - BufferSegment nextSegment = CreateSegmentUnsynchronized(); + BufferSegment nextSegment = CreateSegment(); if (_pool is null) { @@ -337,7 +337,7 @@ private BufferSegment AllocateSegment() return nextSegment; } - private BufferSegment CreateSegmentUnsynchronized() + private BufferSegment CreateSegment() { if (_bufferSegmentPool.TryPop(out BufferSegment? segment)) { @@ -347,7 +347,7 @@ private BufferSegment CreateSegmentUnsynchronized() return new BufferSegment(); } - private void ReturnSegmentUnsynchronized(BufferSegment segment) + private void ReturnSegment(BufferSegment segment) { Debug.Assert(segment != _readHead, "Returning _readHead segment that's in use!"); Debug.Assert(segment != _readTail, "Returning _readTail segment that's in use!"); diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs index c87d6f367cd3b0..41d2ebea766b7b 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs @@ -150,7 +150,7 @@ private void AllocateMemory(int sizeHint) private BufferSegment AllocateSegment(int sizeHint) { - BufferSegment newSegment = CreateSegmentUnsynchronized(); + BufferSegment newSegment = CreateSegment(); if (_pool is null || sizeHint > _pool.MaxBufferSize) { @@ -178,7 +178,7 @@ private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) return adjustedToMaximumSize; } - private BufferSegment CreateSegmentUnsynchronized() + private BufferSegment CreateSegment() { if (_bufferSegmentPool.TryPop(out BufferSegment? segment)) { @@ -188,7 +188,7 @@ private BufferSegment CreateSegmentUnsynchronized() return new BufferSegment(); } - private void ReturnSegmentUnsynchronized(BufferSegment segment) + private void ReturnSegment(BufferSegment segment) { if (_bufferSegmentPool.Count < MaxSegmentPoolSize) { @@ -297,7 +297,7 @@ private async ValueTask FlushAsyncInternal(bool writeToStream, Canc } returnSegment.ResetMemory(); - ReturnSegmentUnsynchronized(returnSegment); + ReturnSegment(returnSegment); // Update the head segment after we return the current segment _head = segment; @@ -364,7 +364,7 @@ private void FlushInternal(bool writeToStream) } returnSegment.ResetMemory(); - ReturnSegmentUnsynchronized(returnSegment); + ReturnSegment(returnSegment); // Update the head segment after we return the current segment _head = segment; diff --git a/src/libraries/System.IO.Pipelines/tests/BufferSegmentPoolTest.cs b/src/libraries/System.IO.Pipelines/tests/BufferSegmentPoolTest.cs index 864920a46fb028..d78d8753d38c63 100644 --- a/src/libraries/System.IO.Pipelines/tests/BufferSegmentPoolTest.cs +++ b/src/libraries/System.IO.Pipelines/tests/BufferSegmentPoolTest.cs @@ -57,7 +57,7 @@ public async Task BufferSegmentsAreReused() [Fact] public async Task BufferSegmentsPooledUpToThreshold() { - int blockCount = Pipe.MaxSegmentPoolSize + 1; + int blockCount = Pipe.MaxSegmentPoolSize + 2; // Write 256 blocks to ensure they get reused for (int i = 0; i < blockCount; i++) @@ -73,7 +73,7 @@ public async Task BufferSegmentsPooledUpToThreshold() Assert.Equal(blockCount, oldSegments.Count); - // This should return them all to the segment pool (256 blocks, the last block will be discarded) + // This should return them all to the segment pool (256 + 1) blocks, the last block will be discarded _pipe.Reader.AdvanceTo(result.Buffer.End); for (int i = 0; i < blockCount; i++) @@ -91,14 +91,14 @@ public async Task BufferSegmentsPooledUpToThreshold() _pipe.Reader.AdvanceTo(result.Buffer.End); - // Assert Pipe.MaxSegmentPoolSize pooled segments - for (int i = 0; i < Pipe.MaxSegmentPoolSize; i++) + // Assert Pipe.MaxSegmentPoolSize + 1 pooled segments (one held inline). + for (int i = 0; i < Pipe.MaxSegmentPoolSize + 1; i++) { - Assert.Same(oldSegments[i], newSegments[Pipe.MaxSegmentPoolSize - i - 1]); + Assert.Contains(oldSegments[i], newSegments); } // The last segment shouldn't exist in the new list of segments at all (it should be new) - Assert.DoesNotContain(oldSegments[256], newSegments); + Assert.DoesNotContain(oldSegments[blockCount - 1], newSegments); } private static List> GetSegments(ReadResult result) From 9a6cf3ba2e8c865ea0e8b89e29734211be2b140b Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 24 May 2020 00:47:20 +0100 Subject: [PATCH 2/4] Reduce contention --- .../src/Resources/Strings.resx | 3 + .../src/System/IO/Pipelines/Pipe.cs | 224 ++++++++---------- .../System/IO/Pipelines/PipeOperationState.cs | 136 +++++++++-- .../src/System/IO/Pipelines/ThrowHelper.cs | 5 + 4 files changed, 227 insertions(+), 141 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx b/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx index 1801eb4fbe37e6..f9b36abec974c5 100644 --- a/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx +++ b/src/libraries/System.IO.Pipelines/src/Resources/Strings.resx @@ -141,6 +141,9 @@ No reading operation to complete. + + No writing operation to advance. + Read was canceled on underlying PipeReader. diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 5c090cae0dc8e7..be72e7169edd06 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -40,20 +40,15 @@ public sealed partial class Pipe private readonly PipeScheduler _readerScheduler; private readonly PipeScheduler _writerScheduler; - // Mutable struct! Don't make this readonly - private BufferSegmentStack _bufferSegmentPool; - private BufferSegment? _pooledBufferSegment; - private readonly DefaultPipeReader _reader; private readonly DefaultPipeWriter _writer; private readonly bool _useSynchronizationContext; + private readonly int _maxPooledBufferSize; - // The number of bytes flushed but not consumed by the reader - private long _unconsumedBytes; - - // The number of bytes written but not flushed - private long _unflushedBytes; + // Mutable struct! Don't make this readonly + private BufferSegmentStack _bufferSegmentPool; + private BufferSegment? _pooledBufferSegment; private PipeAwaitable _readerAwaitable; private PipeAwaitable _writerAwaitable; @@ -61,34 +56,16 @@ public sealed partial class Pipe private PipeCompletion _writerCompletion; private PipeCompletion _readerCompletion; - // Stores the last examined position, used to calculate how many bytes were to release - // for back pressure management - private long _lastExaminedIndex = -1; - - // The read head which is the start of the PipeReader's consumed bytes - private BufferSegment? _readHead; - private int _readHeadIndex; - - private readonly int _maxPooledBufferSize; private bool _disposed; private static readonly Exception s_exceptionSentinel = new Exception(); private volatile Exception? _writerCompletionException; - // The extent of the bytes available to the PipeReader to consume - private BufferSegment? _readTail; - private int _readTailIndex; - - // The write head which is the extent of the PipeWriter's written bytes - private BufferSegment? _writingHead; - private Memory _writingHeadMemory; - private int _writingHeadBytesBuffered; - // Determines what current operation is in flight (reading/writing) private PipeOperationState _operationState; - internal long Length => _unconsumedBytes; + internal long Length => _operationState.UnconsumedBytes; /// /// Initializes the using as options. @@ -110,6 +87,7 @@ public Pipe(PipeOptions options) _bufferSegmentPool = new BufferSegmentStack(InitialSegmentPoolSize); _operationState = default; + _operationState.LastExaminedIndex = -1; _readerCompletion = default; _writerCompletion = default; @@ -136,11 +114,11 @@ private void ResetState() _readerAwaitable = new PipeAwaitable(completed: false, _useSynchronizationContext); _writerAwaitable = new PipeAwaitable(completed: true, _useSynchronizationContext); _writerCompletionException = null; - _readTailIndex = 0; - _readHeadIndex = 0; - _lastExaminedIndex = -1; - _unflushedBytes = 0; - _unconsumedBytes = 0; + _operationState.ReadTailIndex = 0; + _operationState.ReadHeadIndex = 0; + _operationState.LastExaminedIndex = -1; + _operationState.UnflushedBytes = 0; + _operationState.UnconsumedBytes = 0; } internal Memory GetMemory(int sizeHint) @@ -164,7 +142,7 @@ internal Memory GetMemory(int sizeHint) ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } - return _writingHeadMemory; + return _operationState.WritingHeadMemory; } internal Span GetSpan(int sizeHint) @@ -188,7 +166,7 @@ internal Span GetSpan(int sizeHint) ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); } - return _writingHeadMemory.Span; + return _operationState.WritingHeadMemory.Span; } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -197,7 +175,7 @@ private void AllocateWriteHeadIfNeeded(int sizeHint) // If writing is currently active and enough space, don't need to take the lock to just set WritingActive. // IsWritingActive is needed to prevent the reader releasing the writers memory when it fully consumes currently written. if (!_operationState.IsWritingActive || - _writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint) + _operationState.WritingHeadMemory.Length == 0 || _operationState.WritingHeadMemory.Length < sizeHint) { AllocateWriteHead(sizeHint); } @@ -208,35 +186,35 @@ private void AllocateWriteHead(int sizeHint) // First set the Write active and block reader from returning the writer's blocks _operationState.BeginWrite(); - if (_writingHead == null) + if (_operationState.WritingHead == null) { - Debug.Assert(_readHead == null, "Returning _readHead segment that's in use!"); - Debug.Assert(_readTail == null, "Returning _readTail segment that's in use!"); + Debug.Assert(_operationState.ReadHead == null, "Returning ReadHead segment that's in use!"); + Debug.Assert(_operationState.ReadTail == null, "Returning ReadTail segment that's in use!"); // We need to allocate memory to write since nobody has written before BufferSegment newSegment = AllocateSegment(sizeHint); // Set all the pointers - _writingHead = _readHead = _readTail = newSegment; - _lastExaminedIndex = 0; + _operationState.WritingHead = _operationState.ReadHead = _operationState.ReadTail = newSegment; + _operationState.LastExaminedIndex = 0; } else { - int bytesLeftInBuffer = _writingHeadMemory.Length; + int bytesLeftInBuffer = _operationState.WritingHeadMemory.Length; if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint) { - if (_writingHeadBytesBuffered > 0) + if (_operationState.WritingHeadBytesBuffered > 0) { // Flush buffered data to the segment - _writingHead.End += _writingHeadBytesBuffered; - _writingHeadBytesBuffered = 0; + _operationState.WritingHead.End += _operationState.WritingHeadBytesBuffered; + _operationState.WritingHeadBytesBuffered = 0; } BufferSegment newSegment = AllocateSegment(sizeHint); - _writingHead.SetNext(newSegment); - _writingHead = newSegment; + _operationState.WritingHead.SetNext(newSegment); + _operationState.WritingHead = newSegment; } } } @@ -253,7 +231,8 @@ private BufferSegment AllocateSegment(int sizeHint) if (_pool != null && sizeHint <= maxSize) { // Use the specified pool as it fits - newSegment.SetOwnedMemory(_pool.Rent(GetSegmentSize(sizeHint, maxSize))); + int sizeToRequest = GetSegmentSize(sizeHint, maxSize); + newSegment.SetOwnedMemory(_pool.Rent(sizeToRequest)); } else { @@ -262,7 +241,7 @@ private BufferSegment AllocateSegment(int sizeHint) newSegment.SetOwnedMemory(ArrayPool.Shared.Rent(sizeToRequest)); } - _writingHeadMemory = newSegment.AvailableMemory; + _operationState.WritingHeadMemory = newSegment.AvailableMemory; return newSegment; } @@ -291,9 +270,9 @@ private BufferSegment CreateSegmentSynchronized() private void ReturnSegmentUnsynchronized(BufferSegment segment) { - Debug.Assert(segment != _readHead, "Returning _readHead segment that's in use!"); - Debug.Assert(segment != _readTail, "Returning _readTail segment that's in use!"); - Debug.Assert(segment != _writingHead, "Returning _writingHead segment that's in use!"); + Debug.Assert(segment != _operationState.ReadHead, "Returning ReadHead segment that's in use!"); + Debug.Assert(segment != _operationState.ReadTail, "Returning ReadTail segment that's in use!"); + Debug.Assert(segment != _operationState.WritingHead, "Returning WritingHead segment that's in use!"); if (_bufferSegmentPool.Count < MaxSegmentPoolSize) { @@ -305,57 +284,59 @@ internal bool CommitUnsynchronized() { _operationState.EndWrite(); - if (_unflushedBytes == 0) + if (_operationState.UnflushedBytes == 0) { // Nothing written to commit return true; } // Update the writing head - Debug.Assert(_writingHead != null); - _writingHead.End += _writingHeadBytesBuffered; + Debug.Assert(_operationState.WritingHead != null); + _operationState.WritingHead.End += _operationState.WritingHeadBytesBuffered; // Always move the read tail to the write head - _readTail = _writingHead; - _readTailIndex = _writingHead.End; + _operationState.ReadTail = _operationState.WritingHead; + _operationState.ReadTailIndex = _operationState.WritingHead.End; - long oldLength = _unconsumedBytes; - _unconsumedBytes += _unflushedBytes; + long oldLength = _operationState.UnconsumedBytes; + _operationState.UnconsumedBytes += _operationState.UnflushedBytes; // Do not reset if reader is complete if (_pauseWriterThreshold > 0 && oldLength < _pauseWriterThreshold && - _unconsumedBytes >= _pauseWriterThreshold && + _operationState.UnconsumedBytes >= _pauseWriterThreshold && !_readerCompletion.IsCompleted) { _writerAwaitable.SetUncompleted(); } - _unflushedBytes = 0; - _writingHeadBytesBuffered = 0; + _operationState.UnflushedBytes = 0; + _operationState.WritingHeadBytesBuffered = 0; return false; } internal void Advance(int bytes) { - lock (_sync) + if ((uint)bytes > (uint)_operationState.WritingHeadMemory.Length) { - if ((uint)bytes > (uint)_writingHeadMemory.Length) - { - ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes); - } - - AdvanceCore(bytes); + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes); } + + AdvanceCore(bytes); } [MethodImpl(MethodImplOptions.AggressiveInlining)] private void AdvanceCore(int bytesWritten) { - _unflushedBytes += bytesWritten; - _writingHeadBytesBuffered += bytesWritten; - _writingHeadMemory = _writingHeadMemory.Slice(bytesWritten); + if (!_operationState.IsWritingActive) + { + ThrowHelper.ThrowInvalidOperationException_NoWriteToAdvance(); + } + + _operationState.UnflushedBytes += bytesWritten; + _operationState.WritingHeadBytesBuffered += bytesWritten; + _operationState.WritingHeadMemory = _operationState.WritingHeadMemory.Slice(bytesWritten); } internal ValueTask FlushAsync(CancellationToken cancellationToken) @@ -491,31 +472,31 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu lock (_sync) { - var examinedEverything = false; - if (examinedSegment == _readTail) + bool examinedEverything = false; + if (examinedSegment == _operationState.ReadTail) { - examinedEverything = examinedIndex == _readTailIndex; + examinedEverything = examinedIndex == _operationState.ReadTailIndex; } - if (examinedSegment != null && _lastExaminedIndex >= 0) + if (examinedSegment != null && _operationState.LastExaminedIndex >= 0) { - long examinedBytes = BufferSegment.GetLength(_lastExaminedIndex, examinedSegment, examinedIndex); - long oldLength = _unconsumedBytes; + long examinedBytes = BufferSegment.GetLength(_operationState.LastExaminedIndex, examinedSegment, examinedIndex); + long oldLength = _operationState.UnconsumedBytes; if (examinedBytes < 0) { ThrowHelper.ThrowInvalidOperationException_InvalidExaminedPosition(); } - _unconsumedBytes -= examinedBytes; + _operationState.UnconsumedBytes -= examinedBytes; // Store the absolute position - _lastExaminedIndex = examinedSegment.RunningIndex + examinedIndex; + _operationState.LastExaminedIndex = examinedSegment.RunningIndex + examinedIndex; - Debug.Assert(_unconsumedBytes >= 0, "Length has gone negative"); + Debug.Assert(_operationState.UnconsumedBytes >= 0, "Length has gone negative"); if (oldLength >= _resumeWriterThreshold && - _unconsumedBytes < _resumeWriterThreshold) + _operationState.UnconsumedBytes < _resumeWriterThreshold) { _writerAwaitable.Complete(out completionData); } @@ -525,26 +506,26 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu BufferSegment? returnEnd = null; if (consumedSegment != null) { - if (_readHead == null) + if (_operationState.ReadHead == null) { ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor(); return; } - returnStart = _readHead; + returnStart = _operationState.ReadHead; returnEnd = consumedSegment; void MoveReturnEndToNextBlock() { BufferSegment? nextBlock = returnEnd!.NextSegment; - if (_readTail == returnEnd) + if (_operationState.ReadTail == returnEnd) { - _readTail = nextBlock; - _readTailIndex = 0; + _operationState.ReadTail = nextBlock; + _operationState.ReadTailIndex = 0; } - _readHead = nextBlock; - _readHeadIndex = 0; + _operationState.ReadHead = nextBlock; + _operationState.ReadHeadIndex = 0; returnEnd = nextBlock; } @@ -553,30 +534,30 @@ void MoveReturnEndToNextBlock() { // If the writing head isn't block we're about to return, then we can move to the next one // and return this block safely - if (_writingHead != returnEnd) + if (_operationState.WritingHead != returnEnd) { MoveReturnEndToNextBlock(); } // If the writing head is the same as the block to be returned, then we need to make sure // there's no pending write and that there's no buffered data for the writing head - else if (_writingHeadBytesBuffered == 0 && !_operationState.IsWritingActive) + else if (_operationState.WritingHeadBytesBuffered == 0 && !_operationState.IsWritingActive) { // Reset the writing head to null if it's the return block and we've consumed everything - _writingHead = null; - _writingHeadMemory = default; + _operationState.WritingHead = null; + _operationState.WritingHeadMemory = default; MoveReturnEndToNextBlock(); } else { - _readHead = consumedSegment; - _readHeadIndex = consumedIndex; + _operationState.ReadHead = consumedSegment; + _operationState.ReadHeadIndex = consumedIndex; } } else { - _readHead = consumedSegment; - _readHeadIndex = consumedIndex; + _operationState.ReadHead = consumedSegment; + _operationState.ReadHeadIndex = consumedIndex; } } @@ -714,7 +695,6 @@ internal ValueTask ReadAsync(CancellationToken token) ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } - ValueTask result; lock (_sync) { _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); @@ -723,16 +703,12 @@ internal ValueTask ReadAsync(CancellationToken token) if (_readerAwaitable.IsCompleted) { GetReadResult(out ReadResult readResult); - result = new ValueTask(readResult); - } - else - { - // Otherwise it's async - result = new ValueTask(_reader, token: 0); + return new ValueTask(readResult); } } - return result; + // Otherwise it's async + return new ValueTask(_reader, token: 0); } internal bool TryRead(out ReadResult result) @@ -744,7 +720,7 @@ internal bool TryRead(out ReadResult result) ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } - if (_unconsumedBytes > 0 || _readerAwaitable.IsCompleted) + if (_operationState.UnconsumedBytes > 0 || _readerAwaitable.IsCompleted) { GetReadResult(out result); return true; @@ -844,9 +820,9 @@ private void CompletePipe() _disposed = true; // Return all segments - // if _readHead is null we need to try return _commitHead + // if ReadHead is null we need to try return _commitHead // because there might be a block allocated for writing - BufferSegment? segment = _readHead ?? _readTail; + BufferSegment? segment = _operationState.ReadHead ?? _operationState.ReadTail; while (segment != null) { BufferSegment returnSegment = segment; @@ -855,10 +831,10 @@ private void CompletePipe() returnSegment.ResetMemory(); } - _writingHead = null; - _readHead = null; - _readTail = null; - _lastExaminedIndex = -1; + _operationState.WritingHead = null; + _operationState.ReadHead = null; + _operationState.ReadTail = null; + _operationState.LastExaminedIndex = -1; } } @@ -924,12 +900,12 @@ private void GetReadResult(out ReadResult result) bool isCanceled = _readerAwaitable.ObserveCancellation(); // No need to read end if there is no head - BufferSegment? head = _readHead; + BufferSegment? head = _operationState.ReadHead; if (head != null) { - Debug.Assert(_readTail != null); + Debug.Assert(_operationState.ReadTail != null); // Reading commit head shared with writer - var readOnlySequence = new ReadOnlySequence(head, _readHeadIndex, _readTail, _readTailIndex); + var readOnlySequence = new ReadOnlySequence(head, _operationState.ReadHeadIndex, _operationState.ReadTail, _operationState.ReadTailIndex); result = new ReadResult(readOnlySequence, isCanceled, isCompleted); } else @@ -1016,9 +992,9 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella // Allocate whatever the pool gives us so we can write, this also marks the // state as writing AllocateWriteHeadIfNeeded(0); - if (source.Length <= _writingHeadMemory.Length) + if (source.Length <= _operationState.WritingHeadMemory.Length) { - source.CopyTo(_writingHeadMemory); + source.CopyTo(_operationState.WritingHeadMemory); AdvanceCore(source.Length); } @@ -1048,8 +1024,8 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella private void WriteMultiSegment(ReadOnlySpan source) { - Debug.Assert(_writingHead != null); - Span destination = _writingHeadMemory.Span; + Debug.Assert(_operationState.WritingHead != null); + Span destination = _operationState.WritingHeadMemory.Span; while (true) { @@ -1064,17 +1040,17 @@ private void WriteMultiSegment(ReadOnlySpan source) } // We filled the segment - _writingHead.End += writable; - _writingHeadBytesBuffered = 0; + _operationState.WritingHead.End += writable; + _operationState.WritingHeadBytesBuffered = 0; // This is optimized to use pooled memory. That's why we pass 0 instead of // source.Length BufferSegment newSegment = AllocateSegment(0); - _writingHead.SetNext(newSegment); - _writingHead = newSegment; + _operationState.WritingHead.SetNext(newSegment); + _operationState.WritingHead = newSegment; - destination = _writingHeadMemory.Span; + destination = _operationState.WritingHeadMemory.Span; } } diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs index 0b2518a99d4356..551c40869bb952 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs @@ -12,70 +12,172 @@ namespace System.IO.Pipelines [StructLayout(LayoutKind.Explicit)] internal struct PipeOperationState { + // Ensure reader and writer data not on same cache line [FieldOffset(0)] - private ReadState _readState; - [FieldOffset(64)] - private volatile WriteState _writeState; + private ReaderData _readerData; + [FieldOffset(128)] + private WriterData _writerData; + + public BufferSegment? WritingHead + { + get => _writerData._writingHead; + set => _writerData._writingHead = value; + } + + public Memory WritingHeadMemory + { + get => _writerData._writingHeadMemory; + set => _writerData._writingHeadMemory = value; + } + + public int WritingHeadBytesBuffered + { + get => _writerData._writingHeadBytesBuffered; + set => _writerData._writingHeadBytesBuffered = value; + } + + public long UnflushedBytes + { + get => _writerData._unflushedBytes; + set => _writerData._unflushedBytes = value; + } + + public BufferSegment? ReadTail + { + get => _readerData._readTail; + set => _readerData._readTail = value; + } + + public int ReadTailIndex + { + get => _readerData._readTailIndex; + set => _readerData._readTailIndex = value; + } + + public BufferSegment? ReadHead + { + get => _readerData._readHead; + set => _readerData._readHead = value; + } + + public int ReadHeadIndex + { + get => _readerData._readHeadIndex; + set => _readerData._readHeadIndex = value; + } + + public long UnconsumedBytes + { + get => _readerData._unconsumedBytes; + set => _readerData._unconsumedBytes = value; + } + + public long LastExaminedIndex + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => _readerData._lastExaminedIndex; + [MethodImpl(MethodImplOptions.AggressiveInlining)] + set => _readerData._lastExaminedIndex = value; + } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginRead() { - if ((_readState & ReadState.Reading) != 0) + if ((_readerData._readState & ReadState.Reading) != 0) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } - _readState |= ReadState.Reading; + _readerData._readState |= ReadState.Reading; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginReadTentative() { - if ((_readState & ReadState.Reading) != 0) + if ((_readerData._readState & ReadState.Reading) != 0) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } - _readState |= ReadState.ReadingTentative; + _readerData._readState |= ReadState.ReadingTentative; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndRead() { - if (_readState == ReadState.Inactive) + if (_readerData._readState == ReadState.Inactive) { ThrowHelper.ThrowInvalidOperationException_NoReadToComplete(); } - _readState = ReadState.Inactive; + _readerData._readState = ReadState.Inactive; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginWrite() { - _writeState = (WriteState.Allocating | WriteState.Writing); + _writerData._writeState = (WriteState.Allocating | WriteState.Writing); } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndWrite() { - _writeState = WriteState.Inactive; + _writerData._writeState = WriteState.Inactive; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndWriteAllocation() { - Debug.Assert(_writeState == (WriteState.Allocating | WriteState.Writing)); - _writeState = WriteState.Writing; + Debug.Assert(_writerData._writeState == (WriteState.Allocating | WriteState.Writing)); + _writerData._writeState = WriteState.Writing; } - public bool IsWritingActive => (_writeState & WriteState.Writing) != 0; + public bool IsWritingActive + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => (_writerData._writeState & WriteState.Writing) != 0; + } - public bool IsWritingAllocating => (_writeState & WriteState.Allocating) != 0; + public bool IsWritingAllocating + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => (_writerData._writeState & WriteState.Allocating) != 0; + } + + public bool IsReadingActive + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => (_readerData._readState & ReadState.Reading) != 0; + } - public bool IsReadingActive => (_readState & ReadState.Reading) != 0; + private string State => $"WriteState: {_writerData._writeState}; ReadState: {_readerData._readState}"; - private string State => $"WriteState: {_writeState}; ReadState: {_readState}"; + private struct ReaderData + { + public ReadState _readState; + // The read head which is the start of the PipeReader's consumed bytes + public BufferSegment? _readHead; + public int _readHeadIndex; + // The extent of the bytes available to the PipeReader to consume + public BufferSegment? _readTail; + public int _readTailIndex; + // The number of bytes flushed but not consumed by the reader + public long _unconsumedBytes; + // Stores the last examined position, used to calculate how many bytes were to release + // for back pressure management + public long _lastExaminedIndex; + } + + private struct WriterData + { + public volatile WriteState _writeState; + // The write head which is the extent of the PipeWriter's written bytes + public BufferSegment? _writingHead; + public Memory _writingHeadMemory; + public int _writingHeadBytesBuffered; + // The number of bytes written but not flushed + public long _unflushedBytes; + } [Flags] internal enum ReadState : int diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs index 6a124e50f9ee5e..7e0118ff143c1c 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs @@ -19,6 +19,11 @@ internal static class ThrowHelper [MethodImpl(MethodImplOptions.NoInlining)] private static Exception CreateArgumentNullException(ExceptionArgument argument) => new ArgumentNullException(argument.ToString()); + [DoesNotReturn] + public static void ThrowInvalidOperationException_NoWriteToAdvance() => throw CreateInvalidOperationException_NoWriteToAdvance(); + [MethodImpl(MethodImplOptions.NoInlining)] + public static Exception CreateInvalidOperationException_NoWriteToAdvance() => new InvalidOperationException(SR.NoWritingOperationToAdvance); + [DoesNotReturn] public static void ThrowInvalidOperationException_AlreadyReading() => throw CreateInvalidOperationException_AlreadyReading(); [MethodImpl(MethodImplOptions.NoInlining)] From c49f318b112980baced74f364b8c1c098d223515 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 24 May 2020 04:05:48 +0100 Subject: [PATCH 3/4] Fixes --- .../src/System/IO/Pipelines/Pipe.cs | 129 ++++++++++-------- 1 file changed, 75 insertions(+), 54 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index be72e7169edd06..39bbf786451338 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -135,13 +135,6 @@ internal Memory GetMemory(int sizeHint) AllocateWriteHeadIfNeeded(sizeHint); - _operationState.EndWriteAllocation(); - if (_writerCompletionException != null) - { - CompleteWriter(); - ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); - } - return _operationState.WritingHeadMemory; } @@ -159,13 +152,6 @@ internal Span GetSpan(int sizeHint) AllocateWriteHeadIfNeeded(sizeHint); - _operationState.EndWriteAllocation(); - if (_writerCompletionException != null) - { - CompleteWriter(); - ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); - } - return _operationState.WritingHeadMemory.Span; } @@ -185,14 +171,17 @@ private void AllocateWriteHead(int sizeHint) { // First set the Write active and block reader from returning the writer's blocks _operationState.BeginWrite(); + if (_writerCompletionException != null) + { + _operationState.EndWriteAllocation(); + CompleteWriter(); + return; + } if (_operationState.WritingHead == null) { - Debug.Assert(_operationState.ReadHead == null, "Returning ReadHead segment that's in use!"); - Debug.Assert(_operationState.ReadTail == null, "Returning ReadTail segment that's in use!"); - // We need to allocate memory to write since nobody has written before - BufferSegment newSegment = AllocateSegment(sizeHint); + BufferSegment newSegment = AllocateSegmentNoActiveReading(sizeHint); // Set all the pointers _operationState.WritingHead = _operationState.ReadHead = _operationState.ReadTail = newSegment; @@ -217,6 +206,12 @@ private void AllocateWriteHead(int sizeHint) _operationState.WritingHead = newSegment; } } + + _operationState.EndWriteAllocation(); + if (_writerCompletionException != null) + { + CompleteWriter(); + } } private BufferSegment AllocateSegment(int sizeHint) @@ -227,6 +222,49 @@ private BufferSegment AllocateSegment(int sizeHint) newSegment = CreateSegmentSynchronized(); } + InitializeSegmentMemory(sizeHint, newSegment); + + return newSegment; + } + + private BufferSegment AllocateSegmentNoActiveReading(int sizeHint) + { + Debug.Assert(_operationState.ReadHead == null, "Allocating unguarded segment when reader active!"); + Debug.Assert(_operationState.ReadTail == null, "Allocating unguarded segment when reader active!"); + + BufferSegment? newSegment = _pooledBufferSegment; + if (newSegment is null) + { + if (!_bufferSegmentPool.TryPop(out newSegment)) + { + newSegment = new BufferSegment(); + } + } + else + { + _pooledBufferSegment = null; + } + + InitializeSegmentMemory(sizeHint, newSegment); + + return newSegment; + } + + private BufferSegment CreateSegmentSynchronized() + { + lock (_sync) + { + if (_bufferSegmentPool.TryPop(out BufferSegment? segment)) + { + return segment; + } + } + + return new BufferSegment(); + } + + private void InitializeSegmentMemory(int sizeHint, BufferSegment newSegment) + { int maxSize = _maxPooledBufferSize; if (_pool != null && sizeHint <= maxSize) { @@ -242,8 +280,6 @@ private BufferSegment AllocateSegment(int sizeHint) } _operationState.WritingHeadMemory = newSegment.AvailableMemory; - - return newSegment; } private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) @@ -255,19 +291,6 @@ private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue) return adjustedToMaximumSize; } - private BufferSegment CreateSegmentSynchronized() - { - lock (_sync) - { - if (_bufferSegmentPool.TryPop(out BufferSegment? segment)) - { - return segment; - } - } - - return new BufferSegment(); - } - private void ReturnSegmentUnsynchronized(BufferSegment segment) { Debug.Assert(segment != _operationState.ReadHead, "Returning ReadHead segment that's in use!"); @@ -355,7 +378,7 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) private void PrepareFlush(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken) { - var wasEmpty = CommitUnsynchronized(); + bool wasEmpty = CommitUnsynchronized(); // AttachToken before completing reader awaiter in case cancellationToken is already completed _writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this); @@ -989,32 +1012,30 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella CompletionData completionData; ValueTask result; - // Allocate whatever the pool gives us so we can write, this also marks the - // state as writing - AllocateWriteHeadIfNeeded(0); - if (source.Length <= _operationState.WritingHeadMemory.Length) - { - source.CopyTo(_operationState.WritingHeadMemory); - - AdvanceCore(source.Length); - } - else - { - // This is the multi segment copy - WriteMultiSegment(source.Span); - } - - lock (_sync) { - _operationState.EndWriteAllocation(); - - PrepareFlush(out completionData, out result, cancellationToken); - + // Allocate whatever the pool gives us so we can write, this also marks the + // state as writing + AllocateWriteHeadIfNeeded(0); if (_writerCompletionException != null) { CompleteWriter(); + ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); + } + + if (source.Length <= _operationState.WritingHeadMemory.Length) + { + source.CopyTo(_operationState.WritingHeadMemory); + + AdvanceCore(source.Length); + } + else + { + // This is the multi segment copy + WriteMultiSegment(source.Span); } + + PrepareFlush(out completionData, out result, cancellationToken); } TrySchedule(_readerScheduler, completionData); From ca315d280c8e0e82565ce92ead837ec526a8f0c4 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 24 May 2020 06:12:34 +0100 Subject: [PATCH 4/4] Extra safety --- .../src/System/IO/Pipelines/Pipe.cs | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 39bbf786451338..04d9d64f741db0 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -123,10 +123,7 @@ private void ResetState() internal Memory GetMemory(int sizeHint) { - if (_writerCompletion.IsCompleted) - { - ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); - } + ThrowIfWriterCompleted(); if (sizeHint < 0) { @@ -134,16 +131,14 @@ internal Memory GetMemory(int sizeHint) } AllocateWriteHeadIfNeeded(sizeHint); + ThrowIfWriterCompleted(); return _operationState.WritingHeadMemory; } internal Span GetSpan(int sizeHint) { - if (_writerCompletion.IsCompleted) - { - ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); - } + ThrowIfWriterCompleted(); if (sizeHint < 0) { @@ -151,10 +146,20 @@ internal Span GetSpan(int sizeHint) } AllocateWriteHeadIfNeeded(sizeHint); + ThrowIfWriterCompleted(); return _operationState.WritingHeadMemory.Span; } + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void ThrowIfWriterCompleted() + { + if (_writerCompletion.IsCompleted) + { + ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); + } + } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private void AllocateWriteHeadIfNeeded(int sizeHint) { @@ -263,6 +268,7 @@ private BufferSegment CreateSegmentSynchronized() return new BufferSegment(); } + [MethodImpl(MethodImplOptions.AggressiveInlining)] private void InitializeSegmentMemory(int sizeHint, BufferSegment newSegment) { int maxSize = _maxPooledBufferSize; @@ -368,7 +374,7 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) ValueTask result; lock (_sync) { - PrepareFlush(out completionData, out result, cancellationToken); + PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(_readerScheduler, completionData); @@ -376,7 +382,7 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) return result; } - private void PrepareFlush(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken) + private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken) { bool wasEmpty = CommitUnsynchronized(); @@ -1004,16 +1010,12 @@ private void GetFlushResult(ref FlushResult result) internal ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken) { - if (_writerCompletion.IsCompleted) - { - ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed(); - } - CompletionData completionData; ValueTask result; lock (_sync) { + ThrowIfWriterCompleted(); // Allocate whatever the pool gives us so we can write, this also marks the // state as writing AllocateWriteHeadIfNeeded(0); @@ -1035,7 +1037,7 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella WriteMultiSegment(source.Span); } - PrepareFlush(out completionData, out result, cancellationToken); + PrepareFlushUnsynchronized(out completionData, out result, cancellationToken); } TrySchedule(_readerScheduler, completionData);