diff --git a/src/libraries/System.IO.Pipelines/System.IO.Pipelines.sln b/src/libraries/System.IO.Pipelines/System.IO.Pipelines.sln index 88a6f208f09bfc..f0ca3ca4140c7b 100644 --- a/src/libraries/System.IO.Pipelines/System.IO.Pipelines.sln +++ b/src/libraries/System.IO.Pipelines/System.IO.Pipelines.sln @@ -1,4 +1,8 @@ -Microsoft Visual Studio Solution File, Format Version 12.00 + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.31223.327 +MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestUtilities", "..\Common\tests\TestUtilities\TestUtilities.csproj", "{705D4D3B-CE99-4FAC-AB61-07AD8FD3FF3A}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Win32.Registry", "..\Microsoft.Win32.Registry\ref\Microsoft.Win32.Registry.csproj", "{9AF9D7C9-527B-4BB4-B1A3-A26DEAFB3DC0}" @@ -23,18 +27,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ref", "ref", "{A7DEEB5B-C33 EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{037F66C3-C136-4DB5-9DFA-E5A3D53D32F0}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "StreamConformanceTests", "..\Common\tests\StreamConformanceTests\StreamConformanceTests.csproj", "{1E18A8F9-3602-4948-8764-BC7C85840B80}" +EndProject Global - GlobalSection(NestedProjects) = preSolution - {705D4D3B-CE99-4FAC-AB61-07AD8FD3FF3A} = {AB8B533C-817A-4010-9FFD-0206D41307D0} - {1B97BE49-ACAD-4DC9-B195-FE23C3DE5D08} = {AB8B533C-817A-4010-9FFD-0206D41307D0} - {9AF9D7C9-527B-4BB4-B1A3-A26DEAFB3DC0} = {A7DEEB5B-C332-4307-852F-8859096F4DCD} - {DC148B85-05DB-43B2-B2AF-958C305C2C0A} = {A7DEEB5B-C332-4307-852F-8859096F4DCD} - {C6EA39FC-B265-4B3E-B5BA-9D0D2C601691} = {A7DEEB5B-C332-4307-852F-8859096F4DCD} - {E0AFC8E8-C697-40B3-A8AD-52788AE1B753} = {A7DEEB5B-C332-4307-852F-8859096F4DCD} - {90FCF3EB-F36D-4D39-8A4A-623497F54700} = {A7DEEB5B-C332-4307-852F-8859096F4DCD} - {1DD520A2-ED75-4889-848E-DBDC7BE41873} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0} - {FFDC73EC-53C9-4B6E-B468-C477039742D8} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0} - EndGlobalSection GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU Release|Any CPU = Release|Any CPU @@ -76,10 +71,26 @@ Global {90FCF3EB-F36D-4D39-8A4A-623497F54700}.Debug|Any CPU.Build.0 = Debug|Any CPU {90FCF3EB-F36D-4D39-8A4A-623497F54700}.Release|Any CPU.ActiveCfg = Release|Any CPU {90FCF3EB-F36D-4D39-8A4A-623497F54700}.Release|Any CPU.Build.0 = Release|Any CPU + {1E18A8F9-3602-4948-8764-BC7C85840B80}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1E18A8F9-3602-4948-8764-BC7C85840B80}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1E18A8F9-3602-4948-8764-BC7C85840B80}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1E18A8F9-3602-4948-8764-BC7C85840B80}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {705D4D3B-CE99-4FAC-AB61-07AD8FD3FF3A} = {AB8B533C-817A-4010-9FFD-0206D41307D0} + {9AF9D7C9-527B-4BB4-B1A3-A26DEAFB3DC0} = {A7DEEB5B-C332-4307-852F-8859096F4DCD} + {DC148B85-05DB-43B2-B2AF-958C305C2C0A} = {A7DEEB5B-C332-4307-852F-8859096F4DCD} + {1DD520A2-ED75-4889-848E-DBDC7BE41873} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0} + {1B97BE49-ACAD-4DC9-B195-FE23C3DE5D08} = {AB8B533C-817A-4010-9FFD-0206D41307D0} + {C6EA39FC-B265-4B3E-B5BA-9D0D2C601691} = {A7DEEB5B-C332-4307-852F-8859096F4DCD} + {FFDC73EC-53C9-4B6E-B468-C477039742D8} = {037F66C3-C136-4DB5-9DFA-E5A3D53D32F0} + {E0AFC8E8-C697-40B3-A8AD-52788AE1B753} = {A7DEEB5B-C332-4307-852F-8859096F4DCD} + {90FCF3EB-F36D-4D39-8A4A-623497F54700} = {A7DEEB5B-C332-4307-852F-8859096F4DCD} + {1E18A8F9-3602-4948-8764-BC7C85840B80} = {AB8B533C-817A-4010-9FFD-0206D41307D0} + EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2034EAE9-84E4-42C3-8C1F-AB515D313D5E} EndGlobalSection diff --git a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs index 4f35df94ce11a2..28a8f6d4286748 100644 --- a/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs +++ b/src/libraries/System.IO.Pipelines/ref/System.IO.Pipelines.cs @@ -49,11 +49,13 @@ protected PipeReader() { } public virtual System.Threading.Tasks.ValueTask CompleteAsync(System.Exception? exception = null) { throw null; } public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Pipelines.PipeWriter destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task CopyToAsync(System.IO.Stream destination, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions? readerOptions = null) { throw null; } public static System.IO.Pipelines.PipeReader Create(System.Buffers.ReadOnlySequence sequence) { throw null; } + public static System.IO.Pipelines.PipeReader Create(System.IO.Stream stream, System.IO.Pipelines.StreamPipeReaderOptions? readerOptions = null) { throw null; } [System.ObsoleteAttribute("OnWriterCompleted may not be invoked on all implementations of PipeReader. This will be removed in a future release.")] public virtual void OnWriterCompleted(System.Action callback, object? state) { } public abstract System.Threading.Tasks.ValueTask ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); + public System.Threading.Tasks.ValueTask ReadAtLeastAsync(int minimumSize, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + protected virtual System.Threading.Tasks.ValueTask ReadAtLeastAsyncCore(int minimumSize, System.Threading.CancellationToken cancellationToken) { throw null; } public abstract bool TryRead(out System.IO.Pipelines.ReadResult result); } public abstract partial class PipeScheduler diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeReader.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeReader.cs index 15eed360116534..2cbb0ff84800d1 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeReader.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeReader.cs @@ -23,6 +23,8 @@ public DefaultPipeReader(Pipe pipe) public override ValueTask ReadAsync(CancellationToken cancellationToken = default) => _pipe.ReadAsync(cancellationToken); + protected override ValueTask ReadAtLeastAsyncCore(int minimumBytes, CancellationToken cancellationToken) => _pipe.ReadAtLeastAsync(minimumBytes, cancellationToken); + public override void AdvanceTo(SequencePosition consumed) => _pipe.AdvanceReader(consumed); public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => _pipe.AdvanceReader(consumed, examined); diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 1abb6be8ca1553..14f0092125ad4e 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -71,6 +71,7 @@ public sealed partial class Pipe // The extent of the bytes available to the PipeReader to consume private BufferSegment? _readTail; private int _readTailIndex; + private int _minimumReadBytes; // The write head which is the extent of the PipeWriter's written bytes private BufferSegment? _writingHead; @@ -274,7 +275,7 @@ internal bool CommitUnsynchronized() if (_unflushedBytes == 0) { // Nothing written to commit - return true; + return false; } // Update the writing head @@ -288,8 +289,18 @@ internal bool CommitUnsynchronized() long oldLength = _unconsumedBytes; _unconsumedBytes += _unflushedBytes; - // Do not reset if reader is complete - if (PauseWriterThreshold > 0 && + bool resumeReader = true; + + if (_unconsumedBytes < _minimumReadBytes) + { + // Don't yield the reader if we haven't written enough + resumeReader = false; + } + // We only apply back pressure if the reader isn't paused. This is important + // because if it is blocked then this could cause a deadlock (if resumeReader is false). + // If we are resuming the reader, then we can look at the pause threshold to know + // if we should pause the writer. + else if (PauseWriterThreshold > 0 && oldLength < PauseWriterThreshold && _unconsumedBytes >= PauseWriterThreshold && !_readerCompletion.IsCompleted) @@ -300,7 +311,7 @@ internal bool CommitUnsynchronized() _unflushedBytes = 0; _writingHeadBytesBuffered = 0; - return false; + return resumeReader; } internal void Advance(int bytes) @@ -346,7 +357,7 @@ internal ValueTask FlushAsync(CancellationToken cancellationToken) private void PrepareFlush(out CompletionData completionData, out ValueTask result, CancellationToken cancellationToken) { - var wasEmpty = CommitUnsynchronized(); + var completeReader = CommitUnsynchronized(); // AttachToken before completing reader awaiter in case cancellationToken is already completed _writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this); @@ -367,7 +378,7 @@ private void PrepareFlush(out CompletionData completionData, out ValueTask callback, object? st } } + internal ValueTask ReadAtLeastAsync(int minimumBytes, CancellationToken token) + { + if (_readerCompletion.IsCompleted) + { + ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed(); + } + + CompletionData completionData = default; + ValueTask result; + lock (SyncObj) + { + _readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this); + + // If the awaitable is already complete then return the value result directly + if (_readerAwaitable.IsCompleted) + { + GetReadResult(out ReadResult readResult); + + // Short circuit if we have the data or if we enter another terminal state + if (_unconsumedBytes >= minimumBytes || readResult.IsCanceled || readResult.IsCompleted) + { + return new ValueTask(readResult); + } + + // We don't have enough data so we need to reset the reader awaitable + _readerAwaitable.SetUncompleted(); + + // We also need to flip the reading state off + _operationState.EndRead(); + } + + // If the writer is currently paused and we are about the wait for more data then this would deadlock. + // The writer is paused at the pause threshold but the reader needs a minimum amount in order to make progress. + // We resume the writer so that we can unblock this read. + if (!_writerAwaitable.IsCompleted) + { + _writerAwaitable.Complete(out completionData); + } + + // Set the minimum read bytes if we need to wait + _minimumReadBytes = minimumBytes; + + // Otherwise it's async + result = new ValueTask(_reader, token: 0); + } + + TrySchedule(WriterScheduler, in completionData); + + return result; + } + internal ValueTask ReadAsync(CancellationToken token) { if (_readerCompletion.IsCompleted) @@ -889,6 +951,9 @@ private void GetReadResult(out ReadResult result) { _operationState.BeginRead(); } + + // Reset the minimum read bytes when read yields + _minimumReadBytes = 0; } internal ValueTaskSourceStatus GetFlushAsyncStatus() diff --git a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs index 656e1d97935422..165b402aa3f03e 100644 --- a/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs +++ b/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/PipeReader.cs @@ -23,6 +23,43 @@ public abstract partial class PipeReader /// A representing the asynchronous read operation. public abstract ValueTask ReadAsync(CancellationToken cancellationToken = default); + /// Asynchronously reads a sequence of bytes from the current . + /// The minimum length that needs to be buffered in order to for the call to return. + /// The token to monitor for cancellation requests. The default value is . + /// A representing the asynchronous read operation. + /// The call returns if the has read the minimumLength specified, or is cancelled or completed. + public ValueTask ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = default) + { + if (minimumSize < 0) + { + ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.minimumSize); + } + + return ReadAtLeastAsyncCore(minimumSize, cancellationToken); + } + + /// Asynchronously reads a sequence of bytes from the current . + /// The minimum length that needs to be buffered in order to for the call to return. + /// The token to monitor for cancellation requests. The default value is . + /// A representing the asynchronous read operation. + /// The call returns if the has read the minimumLength specified, or is cancelled or completed. + protected virtual async ValueTask ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken) + { + while (true) + { + ReadResult result = await ReadAsync(cancellationToken).ConfigureAwait(false); + ReadOnlySequence buffer = result.Buffer; + + if (buffer.Length >= minimumSize || result.IsCompleted || result.IsCanceled) + { + return result; + } + + // Keep buffering until we get more data + AdvanceTo(buffer.Start, buffer.End); + } + } + /// Moves forward the pipeline's read cursor to after the consumed data, marking the data as processed. /// Marks the extent of the data that has been successfully processed. /// The memory for the consumed data will be released and no longer available. diff --git a/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs b/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs index 3a7732b64efcb0..aa82ad48ac8c0a 100644 --- a/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/BackpressureTests.cs @@ -203,6 +203,32 @@ public void FlushAsyncReturnsNonCompletedSizeWhenCommitOverTheLimit() Assert.False(flushAsync.IsCompleted); } + [Fact] + public async Task ReadAtLeastAsyncUnblocksWriterIfMinimumlowerThanResumeThreshold() + { + PipeWriter writableBuffer = _pipe.Writer.WriteEmpty(PauseWriterThreshold); + ValueTask flushAsync = writableBuffer.FlushAsync(); + Assert.False(flushAsync.IsCompleted); + + ValueTask readAsync = _pipe.Reader.ReadAtLeastAsync(PauseWriterThreshold * 3); + + Assert.False(readAsync.IsCompleted); + + // This should unblock the flush + Assert.True(flushAsync.IsCompleted); + + for (int i = 0; i < 2; i++) + { + writableBuffer = _pipe.Writer.WriteEmpty(PauseWriterThreshold); + flushAsync = writableBuffer.FlushAsync(); + Assert.True(flushAsync.IsCompleted); + } + + var result = await readAsync; + Assert.Equal(PauseWriterThreshold * 3, result.Buffer.Length); + _pipe.Reader.AdvanceTo(result.Buffer.End); + } + [Fact] public async Task FlushAsyncThrowsIfReaderCompletedWithException() { diff --git a/src/libraries/System.IO.Pipelines/tests/Infrastructure/BasePipeReader.cs b/src/libraries/System.IO.Pipelines/tests/Infrastructure/BasePipeReader.cs new file mode 100644 index 00000000000000..43b639a39388df --- /dev/null +++ b/src/libraries/System.IO.Pipelines/tests/Infrastructure/BasePipeReader.cs @@ -0,0 +1,28 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Threading; +using System.Threading.Tasks; + +namespace System.IO.Pipelines.Tests +{ + // This is a PipeReader implementation that does not override any of the virtual methods. + // The intent is to test the base implementation without having to rewrite the base functionality + // of the PipeReader. + public class BasePipeReader : PipeReader + { + private readonly PipeReader _reader; + + public BasePipeReader(PipeReader reader) + { + _reader = reader; + } + + public override void AdvanceTo(SequencePosition consumed) => _reader.AdvanceTo(consumed); + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) => _reader.AdvanceTo(consumed, examined); + public override void CancelPendingRead() => _reader.CancelPendingRead(); + public override void Complete(Exception? exception = null) => _reader.Complete(exception); + public override ValueTask ReadAsync(CancellationToken cancellationToken = default) => _reader.ReadAsync(cancellationToken); + public override bool TryRead(out ReadResult result) => _reader.TryRead(out result); + } +} diff --git a/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs b/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs index 6ad848a5c86268..953a264b44d0eb 100644 --- a/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs +++ b/src/libraries/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs @@ -50,6 +50,76 @@ public async Task CanReadAndWrite() _pipe.Reader.AdvanceTo(buffer.End); } + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task CanWriteAndReadAtLeast(bool baseImplementation) + { + byte[] bytes = Encoding.ASCII.GetBytes("Hello World"); + var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader; + + await _pipe.Writer.WriteAsync(bytes); + ReadResult result = await reader.ReadAtLeastAsync(11); + ReadOnlySequence buffer = result.Buffer; + + Assert.Equal(11, buffer.Length); + Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray())); + + reader.AdvanceTo(buffer.End); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task ReadAtLeastShouldNotCompleteIfWriterWroteLessThanMinimum(bool baseImplementation) + { + byte[] bytes = Encoding.ASCII.GetBytes("Hello World"); + var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader; + + await _pipe.Writer.WriteAsync(bytes.AsMemory(0, 5)); + ValueTask task = reader.ReadAtLeastAsync(11); + + Assert.False(task.IsCompleted); + + await _pipe.Writer.WriteAsync(bytes.AsMemory(5)); + + ReadResult result = await task; + + ReadOnlySequence buffer = result.Buffer; + + Assert.Equal(11, buffer.Length); + Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray())); + + reader.AdvanceTo(buffer.End); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task CanAlternateReadAtLeastAndRead(bool baseImplementation) + { + byte[] bytes = Encoding.ASCII.GetBytes("Hello World"); + var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader; + + await _pipe.Writer.WriteAsync(bytes.AsMemory(0, 5)); + ReadResult result = await reader.ReadAtLeastAsync(3); + ReadOnlySequence buffer = result.Buffer; + + Assert.Equal(5, buffer.Length); + Assert.Equal("Hello", Encoding.ASCII.GetString(buffer.ToArray())); + + reader.AdvanceTo(buffer.End); + + await _pipe.Writer.WriteAsync(bytes.AsMemory(5)); + result = await reader.ReadAsync(); + buffer = result.Buffer; + + Assert.Equal(6, buffer.Length); + Assert.Equal(" World", Encoding.ASCII.GetString(buffer.ToArray())); + + reader.AdvanceTo(buffer.End); + } + [Fact] public async Task AdvanceResetsCommitHeadIndex() { @@ -593,6 +663,26 @@ public void TryReadAfterWriterCompleteReturnsTrue() _pipe.Reader.AdvanceTo(result.Buffer.End); } + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task ReadAtLeastReturnsIfCompleted(bool baseImplementation) + { + var reader = baseImplementation ? new BasePipeReader(_pipe.Reader) : _pipe.Reader; + + _pipe.Writer.Complete(); + + // Make sure we get the same results (state transitions are working) + for (int i = 0; i < 3; i++) + { + ReadResult result = await reader.ReadAtLeastAsync(100); + + Assert.True(result.IsCompleted); + + reader.AdvanceTo(result.Buffer.End); + } + } + [Fact] public void WhenTryReadReturnsFalseDontNeedToCallAdvance() { diff --git a/src/libraries/System.IO.Pipelines/tests/ReadAsyncCancellationTests.cs b/src/libraries/System.IO.Pipelines/tests/ReadAsyncCancellationTests.cs index 0e9fe23999d82b..4ed6da1eeb05ba 100644 --- a/src/libraries/System.IO.Pipelines/tests/ReadAsyncCancellationTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/ReadAsyncCancellationTests.cs @@ -370,6 +370,15 @@ public void ReadAsyncThrowsIfPassedCanceledCancellationToken() Assert.Throws(() => Pipe.Reader.ReadAsync(cancellationTokenSource.Token)); } + [Fact] + public void ReadAtLeastAsyncThrowsIfPassedCanceledCancellationToken() + { + var cancellationTokenSource = new CancellationTokenSource(); + cancellationTokenSource.Cancel(); + + Assert.Throws(() => Pipe.Reader.ReadAtLeastAsync(0, cancellationTokenSource.Token)); + } + [Fact] public async Task ReadAsyncWithNewCancellationTokenNotAffectedByPrevious() { @@ -430,6 +439,29 @@ public async Task WriteAndCancellingPendingReadBeforeReadAsync() Pipe.Reader.AdvanceTo(buffer.End, buffer.End); } + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task WriteAndCancellingPendingReadBeforeReadAtLeastAsync(bool baseImplementation) + { + var reader = baseImplementation ? new BasePipeReader(Pipe.Reader) : Pipe.Reader; + + byte[] bytes = Encoding.ASCII.GetBytes("Hello World"); + PipeWriter output = Pipe.Writer; + output.Write(bytes); + await output.FlushAsync(); + + reader.CancelPendingRead(); + + ReadResult result = await reader.ReadAtLeastAsync(1000); + ReadOnlySequence buffer = result.Buffer; + + Assert.False(result.IsCompleted); + Assert.True(result.IsCanceled); + Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray())); + reader.AdvanceTo(buffer.End); + } + [Fact] public async Task ReadAsyncIsNotCancelledWhenCancellationTokenCancelledBetweenReads() { diff --git a/src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs b/src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs index 69fb7048ac52d6..f0944f511dd263 100644 --- a/src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs +++ b/src/libraries/System.IO.Pipelines/tests/StreamPipeReaderTests.cs @@ -31,6 +31,23 @@ public async Task CanRead() reader.Complete(); } + [Fact] + public async Task CanReadAtLeast() + { + var stream = new MemoryStream(Encoding.ASCII.GetBytes("Hello World")); + var reader = PipeReader.Create(stream); + + ReadResult readResult = await reader.ReadAtLeastAsync(10); + ReadOnlySequence buffer = readResult.Buffer; + + Assert.Equal(11, buffer.Length); + Assert.True(buffer.IsSingleSegment); + Assert.Equal("Hello World", Encoding.ASCII.GetString(buffer.ToArray())); + + reader.AdvanceTo(buffer.End); + reader.Complete(); + } + [Fact] public async Task TryReadReturnsTrueIfBufferedBytesAndNotExaminedEverything() { diff --git a/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj b/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj index fe6ecb26ae0a92..7a409919fdf9aa 100644 --- a/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj +++ b/src/libraries/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj @@ -4,6 +4,7 @@ $(NetCoreAppCurrent);net461 +