From ae6983e2c77f2676e20aec70fb645a22d0cc5ae6 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 24 May 2020 17:45:06 +0100 Subject: [PATCH] Pipelines remove lock from uncontended path --- .../src/System/IO/Pipelines/Pipe.cs | 52 ++++++++------- .../System/IO/Pipelines/PipeOperationState.cs | 66 ++++++++++++++----- .../tests/PipeWriterTests.cs | 8 +++ 3 files changed, 88 insertions(+), 38 deletions(-) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index b92832c053dad5..0d021e2980c467 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -181,7 +181,24 @@ private void AllocateWriteHeadIfNeeded(int sizeHint) if (!_operationState.IsWritingActive || _writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint) { - AllocateWriteHeadSynchronized(sizeHint); + // Set wrtiting before checking anything; this is important as AdvanceReader + // can release the write head to free up memory if it considers the Pipe to be idle. + _operationState.BeginWrite(); + + // Ensure a read is not in progress as it may have released the write head, but not yet returned + // the segments in which case we cannot access them without a lock. + if (!_operationState.IsReadingActive && _writingHead == null) + { + // No reader in progress and no existing data; so we don't need the lock to allocate and initalize. + BufferSegment newSegment = AllocateSegment(sizeHint); + // Set all the pointers + _writingHead = _readHead = _readTail = newSegment; + _lastExaminedIndex = 0; + } + else + { + AllocateWriteHeadSynchronized(sizeHint); + } } } @@ -189,13 +206,10 @@ private void AllocateWriteHeadSynchronized(int sizeHint) { lock (_sync) { - _operationState.BeginWrite(); - if (_writingHead == null) { - // We need to allocate memory to write since nobody has written before + // No existing write head so we need to initialize everything. BufferSegment newSegment = AllocateSegment(sizeHint); - // Set all the pointers _writingHead = _readHead = _readTail = newSegment; _lastExaminedIndex = 0; @@ -313,15 +327,14 @@ internal bool CommitUnsynchronized() internal void Advance(int bytes) { - lock (_sync) + // Advance is not under lock, so Write must be active or Read could release + // the write head to free up idle memory before we have increased the unflushed data. + if (!_operationState.IsWritingActive || (uint)bytes > (uint)_writingHeadMemory.Length) { - if ((uint)bytes > (uint)_writingHeadMemory.Length) - { - ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes); - } - - AdvanceCore(bytes); + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes); } + + AdvanceCore(bytes); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -438,8 +451,6 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition(); } - BufferSegment? returnStart = null; - BufferSegment? returnEnd = null; CompletionData completionData = default; @@ -475,6 +486,8 @@ private void AdvanceReader(BufferSegment? consumedSegment, int consumedIndex, Bu } } + BufferSegment? returnStart = null; + BufferSegment? returnEnd = null; if (consumedSegment != null) { if (_readHead == null) @@ -655,7 +668,6 @@ internal ValueTask ReadAsync(CancellationToken token) ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); } - ValueTask result; lock (_sync) { _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); @@ -664,16 +676,12 @@ internal ValueTask ReadAsync(CancellationToken token) if (_readerAwaitable.IsCompleted) { GetReadResult(out ReadResult readResult); - result = new ValueTask(readResult); - } - else - { - // Otherwise it's async - result = new ValueTask(_reader, token: 0); + return new ValueTask(readResult); } } - return result; + // Otherwise it's async + return new ValueTask(_reader, token: 0); } internal bool TryRead(out ReadResult result) diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs index 1271df5a9a11c8..f2734714c550f5 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeOperationState.cs @@ -4,70 +4,104 @@ using System.Diagnostics; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; namespace System.IO.Pipelines { - [DebuggerDisplay("State: {_state}")] + [DebuggerDisplay("State: {State}")] + [StructLayout(LayoutKind.Explicit)] internal struct PipeOperationState { - private State _state; + // Ensure reader and writer data not on same cache line + [FieldOffset(0)] + private WriterData _writerData; + [FieldOffset(128)] + private ReaderData _readerData; [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginRead() { - if ((_state & State.Reading) == State.Reading) + if ((_readerData._readState & ReadState.Reading) != 0) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } - _state |= State.Reading; + _readerData._readState = ReadState.Reading; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginReadTentative() { - if ((_state & State.Reading) == State.Reading) + if ((_readerData._readState & ReadState.Reading) != 0) { ThrowHelper.ThrowInvalidOperationException_AlreadyReading(); } - _state |= State.ReadingTentative; + _readerData._readState = ReadState.ReadingTentative; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndRead() { - if ((_state & State.Reading) != State.Reading && - (_state & State.ReadingTentative) != State.ReadingTentative) + if (_readerData._readState == ReadState.Inactive) { ThrowHelper.ThrowInvalidOperationException_NoReadToComplete(); } - _state &= ~(State.Reading | State.ReadingTentative); + _readerData._readState = ReadState.Inactive; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void BeginWrite() { - _state |= State.Writing; + _writerData._writeState = WriteState.Writing; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public void EndWrite() { - _state &= ~State.Writing; + _writerData._writeState = WriteState.Inactive; } - public bool IsWritingActive => (_state & State.Writing) == State.Writing; + public bool IsWritingActive + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => (_writerData._writeState & WriteState.Writing) != 0; + } + + public bool IsReadingActive + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + get => (_readerData._readState & (ReadState.Reading | ReadState.ReadingTentative)) != 0; + } + + private string State => $"WriteState: {_writerData._writeState}; ReadState: {_readerData._readState}"; - public bool IsReadingActive => (_state & State.Reading) == State.Reading; + [StructLayout(LayoutKind.Auto)] + private struct ReaderData + { + public volatile ReadState _readState; + } + + [StructLayout(LayoutKind.Auto)] + private struct WriterData + { + public volatile WriteState _writeState; + } [Flags] - internal enum State : byte + internal enum ReadState : int { + Inactive = 0, Reading = 1, - ReadingTentative = 2, - Writing = 4 + ReadingTentative = 2 + } + + [Flags] + internal enum WriteState : int + { + Inactive = 0, + Writing = 1 } } } diff --git a/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs b/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs index 128ce4c3bb4c2b..f398f80b2ccc8c 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeWriterTests.cs @@ -178,6 +178,14 @@ public void ThrowsOnAdvanceOverMemorySize() [Fact] public void ThrowsOnAdvanceWithNoMemory() + { + Memory buffer = Pipe.Writer.GetMemory(1); + Pipe.Writer.Advance(buffer.Length); + Assert.Throws(() => Pipe.Writer.Advance(1)); + } + + [Fact] + public void ThrowsOnAdvanceWithoutWrite() { PipeWriter buffer = Pipe.Writer; Assert.Throws(() => buffer.Advance(1));