From a62ff0cadebaaf65202a2b31383885be001c805c Mon Sep 17 00:00:00 2001 From: Justin Kotalik Date: Mon, 15 Jul 2019 13:31:54 -0700 Subject: [PATCH 1/4] Revert the input pipe in the DuplexStreamPipeAdapter --- .../Internal/DuplexPipeStreamAdapter.cs | 57 ++++++++++++++++++- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 77f4bfd24e81..54898524e2d0 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -15,7 +15,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal /// internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream { + private readonly Pipe _input; private readonly Pipe _output; + private Task _inputTask; private Task _outputTask; private bool _disposed; private readonly object _disposeLock = new object(); @@ -30,6 +32,14 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions r { Stream = createStream(this); + var inputOptions = new PipeOptions(pool: readerOptions.Pool, + readerScheduler: PipeScheduler.ThreadPool, + writerScheduler: PipeScheduler.Inline, + pauseWriterThreshold: 1, + resumeWriterThreshold: 1, + minimumSegmentSize: readerOptions.Pool.GetMinimumSegmentSize(), + useSynchronizationContext: false); + var outputOptions = new PipeOptions(pool: writerOptions.Pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, @@ -38,7 +48,7 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions r minimumSegmentSize: writerOptions.MinimumBufferSize, useSynchronizationContext: false); - Input = PipeReader.Create(Stream, readerOptions); + _input = new Pipe(inputOptions); // We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions // about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once @@ -50,7 +60,50 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions r public TStream Stream { get; } - public PipeReader Input { get; } + public PipeReader Input + { + get + { + if (_inputTask == null) + { + _inputTask = ReadInputAsync(); + } + return _input.Reader; + } + } + + private async Task ReadInputAsync() + { + try + { + while (true) + { + var memory = _input.Writer.GetMemory(); + + var result = await Stream.ReadAsync(memory); + + if (result == 0) + { + break; + } + + _input.Writer.Advance(result); + var flushResult = await _input.Writer.FlushAsync(); + if (flushResult.IsCompleted) + { + break; + } + } + } + catch (Exception ex) + { + Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(ReadInputAsync)}"); + } + finally + { + _input.Writer.Complete(); + } + } public PipeWriter Output { From 9a3bb951220b90f869533aaed27a746269746347 Mon Sep 17 00:00:00 2001 From: Justin Kotalik Date: Mon, 15 Jul 2019 13:33:46 -0700 Subject: [PATCH 2/4] nits --- .../Internal/DuplexPipeStreamAdapter.cs | 67 ++++++++++--------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 54898524e2d0..122916136f2f 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -72,39 +72,6 @@ public PipeReader Input } } - private async Task ReadInputAsync() - { - try - { - while (true) - { - var memory = _input.Writer.GetMemory(); - - var result = await Stream.ReadAsync(memory); - - if (result == 0) - { - break; - } - - _input.Writer.Advance(result); - var flushResult = await _input.Writer.FlushAsync(); - if (flushResult.IsCompleted) - { - break; - } - } - } - catch (Exception ex) - { - Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(ReadInputAsync)}"); - } - finally - { - _input.Writer.Complete(); - } - } - public PipeWriter Output { get @@ -142,6 +109,40 @@ public override ValueTask DisposeAsync() return new ValueTask(_outputTask); } + private async Task ReadInputAsync() + { + try + { + while (true) + { + var memory = _input.Writer.GetMemory(); + + var result = await Stream.ReadAsync(memory); + + if (result == 0) + { + break; + } + + _input.Writer.Advance(result); + var flushResult = await _input.Writer.FlushAsync(); + if (flushResult.IsCompleted) + { + // flushResult should not be canceled. + break; + } + } + } + catch (Exception ex) + { + Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(ReadInputAsync)}"); + } + finally + { + _input.Writer.Complete(); + } + } + private async Task WriteOutputAsync() { try From ed204d734e2133f9aa2743135ee32214cfe04113 Mon Sep 17 00:00:00 2001 From: Justin Kotalik Date: Mon, 15 Jul 2019 14:59:51 -0700 Subject: [PATCH 3/4] reverting better --- .../Internal/DuplexPipeStreamAdapter.cs | 61 +++++++++++++------ 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 122916136f2f..88fd540a77b6 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -5,6 +5,7 @@ using System.IO; using System.IO.Pipelines; using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal @@ -21,6 +22,7 @@ internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe private Task _outputTask; private bool _disposed; private readonly object _disposeLock = new object(); + private readonly int _minAllocBufferSize; public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) : this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream) @@ -28,7 +30,7 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func cre } public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : - base(duplexPipe.Input, duplexPipe.Output) + base(duplexPipe.Input, duplexPipe.Output, throwOnCancelled: true) { Stream = createStream(this); @@ -48,6 +50,8 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions r minimumSegmentSize: writerOptions.MinimumBufferSize, useSynchronizationContext: false); + _minAllocBufferSize = writerOptions.MinimumBufferSize; + _input = new Pipe(inputOptions); // We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions @@ -66,8 +70,9 @@ public PipeReader Input { if (_inputTask == null) { - _inputTask = ReadInputAsync(); + RunAsync(); } + return _input.Reader; } } @@ -78,68 +83,86 @@ public PipeWriter Output { if (_outputTask == null) { - _outputTask = WriteOutputAsync(); + RunAsync(); } return _output.Writer; } } - public override ValueTask DisposeAsync() + public void RunAsync() + { + _inputTask = ReadInputAsync(); + _outputTask = WriteOutputAsync(); + } + + public override async ValueTask DisposeAsync() { lock (_disposeLock) { if (_disposed) { - return default; + return; } _disposed = true; } - Input.Complete(); _output.Writer.Complete(); + _input.Reader.Complete(); - if (_outputTask == null || _outputTask.IsCompletedSuccessfully) + if (_outputTask == null) { - // Wait for the output task to complete, this ensures that we've copied - // the application data to the underlying stream - return default; + return; } - return new ValueTask(_outputTask); + await _outputTask; + + CancelPendingRead(); + + await _inputTask; } private async Task ReadInputAsync() { + Exception error = null; try { while (true) { - var memory = _input.Writer.GetMemory(); + var outputBuffer = _input.Writer.GetMemory(_minAllocBufferSize); - var result = await Stream.ReadAsync(memory); + var bytesRead = await Stream.ReadAsync(outputBuffer); + _input.Writer.Advance(bytesRead); - if (result == 0) + if (bytesRead == 0) { + // FIN break; } - _input.Writer.Advance(result); - var flushResult = await _input.Writer.FlushAsync(); - if (flushResult.IsCompleted) + var result = await _input.Writer.FlushAsync(); + + if (result.IsCompleted) { // flushResult should not be canceled. break; } } + + } + catch (OperationCanceledException ex) + { + // Propagate the exception if it's ConnectionAbortedException + error = ex as ConnectionAbortedException; } catch (Exception ex) { - Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(ReadInputAsync)}"); + // Don't rethrow the exception. It should be handled by the Pipeline consumer. + error = ex; } finally { - _input.Writer.Complete(); + _input.Writer.Complete(error); } } From 20ae3ecbecd7c1f88a37abaf5a76b5ccf70b8400 Mon Sep 17 00:00:00 2001 From: Justin Kotalik Date: Mon, 15 Jul 2019 17:16:45 -0700 Subject: [PATCH 4/4] Update DuplexPipeStreamAdapter.cs --- .../Middleware/Internal/DuplexPipeStreamAdapter.cs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 88fd540a77b6..592eeb67c7d4 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -115,11 +115,17 @@ public override async ValueTask DisposeAsync() return; } - await _outputTask; - + if (_outputTask != null) + { + await _outputTask; + } + CancelPendingRead(); - - await _inputTask; + + if (_inputTask != null) + { + await _inputTask; + } } private async Task ReadInputAsync()