From 826c7885b3309b367cb5f5e3daa984fa01ddf20b Mon Sep 17 00:00:00 2001 From: David Fowler Date: Tue, 25 Jun 2019 22:55:22 -0700 Subject: [PATCH 1/8] Revert back to copying data to pipes --- .../Internal/DuplexPipeStreamAdapter.cs | 179 ++++++++++++++++-- 1 file changed, 166 insertions(+), 13 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 5b5cc019057c..bb08a03af1aa 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -4,7 +4,10 @@ using System; using System.IO; using System.IO.Pipelines; +using System.Threading; using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; +using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal { @@ -14,36 +17,186 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal /// internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream { + private Task _inputTask; + private Task _outputTask; + private int _minAllocBufferSize; + public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) : this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream) { } - public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : base(duplexPipe.Input, duplexPipe.Output) + public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : + base(duplexPipe.Input, duplexPipe.Output, throwOnCancelled: true) { Stream = createStream(this); - Input = PipeReader.Create(Stream, readerOptions); - Output = PipeWriter.Create(Stream, writerOptions); + + var inputOptions = new PipeOptions(pool: readerOptions.Pool, + readerScheduler: PipeScheduler.Inline, + writerScheduler: PipeScheduler.Inline, + minimumSegmentSize: readerOptions.BufferSize, + pauseWriterThreshold: 1, + resumeWriterThreshold: 1, + useSynchronizationContext: false); + + var outputOptions = new PipeOptions(pool: writerOptions.Pool, + readerScheduler: PipeScheduler.Inline, + writerScheduler: PipeScheduler.Inline, + pauseWriterThreshold: 1, + resumeWriterThreshold: 1, + minimumSegmentSize: writerOptions.MinimumBufferSize, + useSynchronizationContext: false); + + Input = new Pipe(inputOptions); + Output = new Pipe(outputOptions); + + _minAllocBufferSize = readerOptions.MinimumReadSize; } + public ILogger Log { get; private set; } + public TStream Stream { get; } - public PipeReader Input { get; } + private Pipe Input { get; } + + private Pipe Output { get; } + + PipeReader IDuplexPipe.Input + { + get + { + if (_inputTask == null) + { + _inputTask = ReadInputAsync(); + } + return Input.Reader; + } + } + + PipeWriter IDuplexPipe.Output + { + get + { + if (_outputTask == null) + { + _outputTask = WriteOutputAsync(); + } + + return Output.Writer; + } + } + + public override async ValueTask DisposeAsync() + { + Output.Writer.Complete(); + Input.Reader.Complete(); + - public PipeWriter Output { get; } + if (_outputTask != null) + { + // Wait for the output task to complete, this ensures that we've copied + // the application data to the underlying stream + await _outputTask; + } + + // Cancel the underlying stream so that the input task yields + CancelPendingRead(); + + if (_inputTask != null) + { + // The input task should yield now that we've cancelled it + await _inputTask; + } + } - protected override void Dispose(bool disposing) + private async Task WriteOutputAsync() { - Input.Complete(); - Output.Complete(); - base.Dispose(disposing); + try + { + while (true) + { + var result = await Output.Reader.ReadAsync(); + var buffer = result.Buffer; + + try + { + if (buffer.IsEmpty) + { + if (result.IsCompleted) + { + break; + } + await Stream.FlushAsync(); + } + else if (buffer.IsSingleSegment) + { + await Stream.WriteAsync(buffer.First); + } + else + { + foreach (var memory in buffer) + { + await Stream.WriteAsync(memory); + } + } + } + finally + { + Output.Reader.AdvanceTo(buffer.End); + } + } + } + catch (Exception ex) + { + Log.LogError(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}"); + } + finally + { + Output.Reader.Complete(); + } } - public override ValueTask DisposeAsync() + private async Task ReadInputAsync() { - Input.Complete(); - Output.Complete(); - return base.DisposeAsync(); + Exception error = null; + + try + { + while (true) + { + var outputBuffer = Input.Writer.GetMemory(_minAllocBufferSize); + var bytesRead = await Stream.ReadAsync(outputBuffer); + Input.Writer.Advance(bytesRead); + + if (bytesRead == 0) + { + // FIN + break; + } + + var result = await Input.Writer.FlushAsync(); + + if (result.IsCompleted) + { + break; + } + } + } + catch (OperationCanceledException ex) + { + // Propagate the exception if it's ConnectionAbortedException + error = ex as ConnectionAbortedException; + } + catch (Exception ex) + { + // Don't rethrow the exception. It should be handled by the Pipeline consumer. + error = ex; + } + finally + { + Input.Writer.Complete(error); + } } } } + From e2c18dbc5265e3e369ecc00cecd7f0e14615035f Mon Sep 17 00:00:00 2001 From: David Fowler Date: Wed, 26 Jun 2019 09:23:56 -0700 Subject: [PATCH 2/8] Replace the output pipe only --- .../Internal/DuplexPipeStreamAdapter.cs | 88 ++----------------- 1 file changed, 5 insertions(+), 83 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index bb08a03af1aa..9d08771dc585 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -4,9 +4,7 @@ using System; using System.IO; using System.IO.Pipelines; -using System.Threading; using System.Threading.Tasks; -using Microsoft.AspNetCore.Connections; using Microsoft.Extensions.Logging; namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal @@ -17,9 +15,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal /// internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream { - private Task _inputTask; private Task _outputTask; - private int _minAllocBufferSize; public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) : this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream) @@ -27,18 +23,10 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func cre } public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func createStream) : - base(duplexPipe.Input, duplexPipe.Output, throwOnCancelled: true) + base(duplexPipe.Input, duplexPipe.Output) { Stream = createStream(this); - var inputOptions = new PipeOptions(pool: readerOptions.Pool, - readerScheduler: PipeScheduler.Inline, - writerScheduler: PipeScheduler.Inline, - minimumSegmentSize: readerOptions.BufferSize, - pauseWriterThreshold: 1, - resumeWriterThreshold: 1, - useSynchronizationContext: false); - var outputOptions = new PipeOptions(pool: writerOptions.Pool, readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, @@ -47,32 +35,18 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions r minimumSegmentSize: writerOptions.MinimumBufferSize, useSynchronizationContext: false); - Input = new Pipe(inputOptions); + Input = PipeReader.Create(Stream, readerOptions); Output = new Pipe(outputOptions); - - _minAllocBufferSize = readerOptions.MinimumReadSize; } public ILogger Log { get; private set; } public TStream Stream { get; } - private Pipe Input { get; } - private Pipe Output { get; } - PipeReader IDuplexPipe.Input - { - get - { - if (_inputTask == null) - { - _inputTask = ReadInputAsync(); - } - return Input.Reader; - } - } - + public PipeReader Input { get; } + PipeWriter IDuplexPipe.Output { get @@ -88,9 +62,8 @@ PipeWriter IDuplexPipe.Output public override async ValueTask DisposeAsync() { + Input.Complete(); Output.Writer.Complete(); - Input.Reader.Complete(); - if (_outputTask != null) { @@ -98,15 +71,6 @@ public override async ValueTask DisposeAsync() // the application data to the underlying stream await _outputTask; } - - // Cancel the underlying stream so that the input task yields - CancelPendingRead(); - - if (_inputTask != null) - { - // The input task should yield now that we've cancelled it - await _inputTask; - } } private async Task WriteOutputAsync() @@ -155,48 +119,6 @@ private async Task WriteOutputAsync() Output.Reader.Complete(); } } - - private async Task ReadInputAsync() - { - Exception error = null; - - try - { - while (true) - { - var outputBuffer = Input.Writer.GetMemory(_minAllocBufferSize); - var bytesRead = await Stream.ReadAsync(outputBuffer); - Input.Writer.Advance(bytesRead); - - if (bytesRead == 0) - { - // FIN - break; - } - - var result = await Input.Writer.FlushAsync(); - - if (result.IsCompleted) - { - break; - } - } - } - catch (OperationCanceledException ex) - { - // Propagate the exception if it's ConnectionAbortedException - error = ex as ConnectionAbortedException; - } - catch (Exception ex) - { - // Don't rethrow the exception. It should be handled by the Pipeline consumer. - error = ex; - } - finally - { - Input.Writer.Complete(error); - } - } } } From c9060f72b8da28c270a0d1a32dc088f7086596f0 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Tue, 25 Jun 2019 23:40:16 -0700 Subject: [PATCH 3/8] Don't complete the connection pipe in Http2FrameWriter - This leads to trunated data in some cases. Instead just yield the middleware so we can be sure no more user code is running (Http1OutputProducer does this as well). There are still cases where a misbeaving application that doesn't properly await writes gets cut off but that will be fixed in the SteamPipeWriter itself. - Updated tests --- .../Core/src/Internal/Http2/Http2FrameWriter.cs | 1 - .../Http2/Http2TestBase.cs | 17 ++++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs index 630927bb3ff6..593408337142 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs @@ -89,7 +89,6 @@ public void Complete() _completed = true; _connectionOutputFlowControl.Abort(); - _outputWriter.Complete(); } } diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs index 543a9dc2896b..d9fd7fe6f52f 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs @@ -467,7 +467,22 @@ protected async Task InitializeConnectionAsync(RequestDelegate application, int CreateConnection(); } - _connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application)); + var connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application)); + + async Task CompletePipeOnTaskCompletion() + { + try + { + await connectionTask; + } + finally + { + _pair.Transport.Input.Complete(); + _pair.Transport.Output.Complete(); + } + } + + _connectionTask = CompletePipeOnTaskCompletion(); await SendPreambleAsync().ConfigureAwait(false); await SendSettingsAsync(); From c9918175338d252aba28b5d7b196c84c45937c30 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Wed, 26 Jun 2019 09:35:38 -0700 Subject: [PATCH 4/8] Set a logger --- .../Core/src/Middleware/HttpsConnectionMiddleware.cs | 10 ++++++++-- .../src/Middleware/Internal/DuplexPipeStreamAdapter.cs | 4 ++-- .../Core/src/Middleware/LoggingConnectionMiddleware.cs | 1 + 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs b/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs index 2449fc514629..7c34e025c6e8 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs @@ -103,7 +103,10 @@ private async Task InnerOnConnectionAsync(ConnectionContext context) if (_options.ClientCertificateMode == ClientCertificateMode.NoCertificate) { - sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions); + sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions) + { + Log = _logger + }; certificateRequired = false; } else @@ -140,7 +143,10 @@ private async Task InnerOnConnectionAsync(ConnectionContext context) } return true; - })); + })) + { + Log = _logger + }; certificateRequired = true; } diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 9d08771dc585..79f2670a2469 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -39,7 +39,7 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions r Output = new Pipe(outputOptions); } - public ILogger Log { get; private set; } + public ILogger Log { get; set; } public TStream Stream { get; } @@ -112,7 +112,7 @@ private async Task WriteOutputAsync() } catch (Exception ex) { - Log.LogError(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}"); + Log?.LogError(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}"); } finally { diff --git a/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs b/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs index ed9ffb819c7b..5ce9f6df510f 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs @@ -44,6 +44,7 @@ private class LoggingDuplexPipe : DuplexPipeStreamAdapter public LoggingDuplexPipe(IDuplexPipe transport, ILogger logger) : base(transport, stream => new LoggingStream(stream, logger)) { + Log = logger; } } } From 87be120ed6b1cd391d0b08c55745b3956e676d65 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Wed, 26 Jun 2019 10:03:48 -0700 Subject: [PATCH 5/8] Small nits --- .../Internal/DuplexPipeStreamAdapter.cs | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 79f2670a2469..ea401010e1fb 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -15,6 +15,7 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal /// internal class DuplexPipeStreamAdapter : DuplexPipeStream, IDuplexPipe where TStream : Stream { + private readonly Pipe _output; private Task _outputTask; public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func createStream) : @@ -36,18 +37,20 @@ public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions r useSynchronizationContext: false); Input = PipeReader.Create(Stream, readerOptions); - Output = new Pipe(outputOptions); + + // We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions + // about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once + // those patterns are fixed. + _output = new Pipe(outputOptions); } public ILogger Log { get; set; } public TStream Stream { get; } - private Pipe Output { get; } - public PipeReader Input { get; } - - PipeWriter IDuplexPipe.Output + + public PipeWriter Output { get { @@ -56,14 +59,14 @@ PipeWriter IDuplexPipe.Output _outputTask = WriteOutputAsync(); } - return Output.Writer; + return _output.Writer; } } public override async ValueTask DisposeAsync() { Input.Complete(); - Output.Writer.Complete(); + _output.Writer.Complete(); if (_outputTask != null) { @@ -79,7 +82,7 @@ private async Task WriteOutputAsync() { while (true) { - var result = await Output.Reader.ReadAsync(); + var result = await _output.Reader.ReadAsync(); var buffer = result.Buffer; try @@ -90,6 +93,7 @@ private async Task WriteOutputAsync() { break; } + await Stream.FlushAsync(); } else if (buffer.IsSingleSegment) @@ -106,7 +110,7 @@ private async Task WriteOutputAsync() } finally { - Output.Reader.AdvanceTo(buffer.End); + _output.Reader.AdvanceTo(buffer.End); } } } @@ -116,7 +120,7 @@ private async Task WriteOutputAsync() } finally { - Output.Reader.Complete(); + _output.Reader.Complete(); } } } From 0ab532b789f97f7368c44ef6eb649ef3a54b8733 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Wed, 26 Jun 2019 10:11:42 -0700 Subject: [PATCH 6/8] PR feedback and simplification --- .../Internal/DuplexPipeStreamAdapter.cs | 34 +------------------ 1 file changed, 1 insertion(+), 33 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index ea401010e1fb..97af0527bf13 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -80,39 +80,7 @@ private async Task WriteOutputAsync() { try { - while (true) - { - var result = await _output.Reader.ReadAsync(); - var buffer = result.Buffer; - - try - { - if (buffer.IsEmpty) - { - if (result.IsCompleted) - { - break; - } - - await Stream.FlushAsync(); - } - else if (buffer.IsSingleSegment) - { - await Stream.WriteAsync(buffer.First); - } - else - { - foreach (var memory in buffer) - { - await Stream.WriteAsync(memory); - } - } - } - finally - { - _output.Reader.AdvanceTo(buffer.End); - } - } + await _output.Reader.CopyToAsync(Stream); } catch (Exception ex) { From 63bad1002f4b71b49f16478b1d211daa031ebc41 Mon Sep 17 00:00:00 2001 From: David Fowler Date: Wed, 26 Jun 2019 11:42:31 -0700 Subject: [PATCH 7/8] Remove CTA --- .../Internal/DuplexPipeStreamAdapter.cs | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index 97af0527bf13..ea401010e1fb 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -80,7 +80,39 @@ private async Task WriteOutputAsync() { try { - await _output.Reader.CopyToAsync(Stream); + while (true) + { + var result = await _output.Reader.ReadAsync(); + var buffer = result.Buffer; + + try + { + if (buffer.IsEmpty) + { + if (result.IsCompleted) + { + break; + } + + await Stream.FlushAsync(); + } + else if (buffer.IsSingleSegment) + { + await Stream.WriteAsync(buffer.First); + } + else + { + foreach (var memory in buffer) + { + await Stream.WriteAsync(memory); + } + } + } + finally + { + _output.Reader.AdvanceTo(buffer.End); + } + } } catch (Exception ex) { From 1891f35a14252f2f9b33218bd2c449f0f3c88488 Mon Sep 17 00:00:00 2001 From: Justin Kotalik Date: Wed, 26 Jun 2019 14:48:20 -0700 Subject: [PATCH 8/8] Critical --- .../Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs index ea401010e1fb..0a0d09384c21 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs @@ -116,7 +116,7 @@ private async Task WriteOutputAsync() } catch (Exception ex) { - Log?.LogError(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}"); + Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}"); } finally {