Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Kestrel.Core/CoreStrings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,9 @@ For more information on configuring HTTPS see https://go.microsoft.com/fwlink/?l
<data name="Http2StreamAborted" xml:space="preserve">
<value>The request stream was aborted.</value>
</data>
<data name="Http2ErrorFlowControlWindowExceeded" xml:space="preserve">
<value>The client sent more data than what was available in the flow-control window.</value>
</data>
<data name="Http2ErrorConnectMustNotSendSchemeOrPath" xml:space="preserve">
<value>CONNECT requests must not send :scheme or :path headers.</value>
</data>
Expand Down
13 changes: 13 additions & 0 deletions src/Kestrel.Core/Internal/Http/Http1Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public Http1Connection(Http1ConnectionContext context)
_keepAliveTicks = ServerOptions.Limits.KeepAliveTimeout.Ticks;
_requestHeadersTimeoutTicks = ServerOptions.Limits.RequestHeadersTimeout.Ticks;

RequestBodyPipe = CreateRequestBodyPipe();
Output = new Http1OutputProducer(
_context.Transport.Output,
_context.ConnectionId,
Expand Down Expand Up @@ -470,5 +471,17 @@ protected override bool TryParseRequest(ReadResult result, out bool endConnectio
return false;
}
}

private Pipe CreateRequestBodyPipe()
=> new Pipe(new PipeOptions
(
pool: _context.MemoryPool,
readerScheduler: ServiceContext.Scheduler,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
useSynchronizationContext: false,
minimumSegmentSize: KestrelMemoryPool.MinimumSegmentSize
));
}
}
15 changes: 1 addition & 14 deletions src/Kestrel.Core/Internal/Http/HttpProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,11 @@ public HttpProtocol(IHttpProtocolContext context)

ServerOptions = ServiceContext.ServerOptions;
HttpResponseControl = this;
RequestBodyPipe = CreateRequestBodyPipe();
}

public IHttpResponseControl HttpResponseControl { get; set; }

public Pipe RequestBodyPipe { get; }
public Pipe RequestBodyPipe { get; protected set; }

public ServiceContext ServiceContext => _context.ServiceContext;
private IPEndPoint LocalEndPoint => _context.LocalEndPoint;
Expand Down Expand Up @@ -1332,17 +1331,5 @@ protected void ReportApplicationError(Exception ex)

Log.ApplicationError(ConnectionId, TraceIdentifier, ex);
}

private Pipe CreateRequestBodyPipe()
=> new Pipe(new PipeOptions
(
pool: _context.MemoryPool,
readerScheduler: ServiceContext.Scheduler,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
useSynchronizationContext: false,
minimumSegmentSize: KestrelMemoryPool.MinimumSegmentSize
));
}
}
22 changes: 20 additions & 2 deletions src/Kestrel.Core/Internal/Http/MessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,18 @@ protected MessageBody(HttpProtocol context)
var result = await _context.RequestBodyPipe.Reader.ReadAsync();
var readableBuffer = result.Buffer;
var consumed = readableBuffer.End;
var actual = 0;

try
{
if (!readableBuffer.IsEmpty)
{
// buffer.Count is int
var actual = (int)Math.Min(readableBuffer.Length, buffer.Length);
// buffer.Count is int
actual = (int)Math.Min(readableBuffer.Length, buffer.Length);
var slice = readableBuffer.Slice(0, actual);
consumed = readableBuffer.GetPosition(actual);
slice.CopyTo(buffer.Span);

return actual;
}

Expand All @@ -66,6 +68,10 @@ protected MessageBody(HttpProtocol context)
finally
{
_context.RequestBodyPipe.Reader.AdvanceTo(consumed);

// Update the flow-control window after advancing the pipe reader, so we don't risk overfilling
// the pipe despite the client being well-behaved.
OnDataRead(actual);
}
}
}
Expand All @@ -79,6 +85,7 @@ protected MessageBody(HttpProtocol context)
var result = await _context.RequestBodyPipe.Reader.ReadAsync();
var readableBuffer = result.Buffer;
var consumed = readableBuffer.End;
var bytesRead = 0;

try
{
Expand All @@ -89,6 +96,9 @@ protected MessageBody(HttpProtocol context)
// REVIEW: This *could* be slower if 2 things are true
// - The WriteAsync(ReadOnlyMemory<byte>) isn't overridden on the destination
// - We change the Kestrel Memory Pool to not use pinned arrays but instead use native memory

bytesRead += memory.Length;

#if NETCOREAPP2_1
await destination.WriteAsync(memory);
#elif NETSTANDARD2_0
Expand All @@ -108,6 +118,10 @@ protected MessageBody(HttpProtocol context)
finally
{
_context.RequestBodyPipe.Reader.AdvanceTo(consumed);

// Update the flow-control window after advancing the pipe reader, so we don't risk overfilling
// the pipe despite the client being well-behaved.
OnDataRead(bytesRead);
}
}
}
Expand Down Expand Up @@ -150,6 +164,10 @@ protected virtual void OnReadStarted()
{
}

protected virtual void OnDataRead(int bytesRead)
{
}

private class ForZeroContentLength : MessageBody
{
public ForZeroContentLength(bool keepAlive)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,23 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Diagnostics;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
public class Http2OutputFlowControl
public struct FlowControl
{
private readonly Queue<Http2OutputFlowControlAwaitable> _awaitableQueue = new Queue<Http2OutputFlowControlAwaitable>();

public Http2OutputFlowControl(uint initialWindowSize)
public FlowControl(uint initialWindowSize)
{
Debug.Assert(initialWindowSize <= Http2PeerSettings.MaxWindowSize, $"{nameof(initialWindowSize)} too large.");

Available = (int)initialWindowSize;
IsAborted = false;
}

public int Available { get; private set; }
public bool IsAborted { get; private set; }

public Http2OutputFlowControlAwaitable AvailabilityAwaitable
{
get
{
Debug.Assert(!IsAborted, $"({nameof(AvailabilityAwaitable)} accessed after abort.");
Debug.Assert(Available <= 0, $"({nameof(AvailabilityAwaitable)} accessed with {Available} bytes available.");

var awaitable = new Http2OutputFlowControlAwaitable();
_awaitableQueue.Enqueue(awaitable);
return awaitable;
}
}

public void Advance(int bytes)
{
Debug.Assert(!IsAborted, $"({nameof(Advance)} called after abort.");
Expand All @@ -55,23 +40,12 @@ public bool TryUpdateWindow(int bytes)

Available += bytes;

while (Available > 0 && _awaitableQueue.Count > 0)
{
var awaitable = _awaitableQueue.Dequeue();
awaitable.Complete();
}

return true;
}

public void Abort()
{
IsAborted = true;

while (_awaitableQueue.Count > 0)
{
_awaitableQueue.Dequeue().Complete();
}
}
}
}
116 changes: 116 additions & 0 deletions src/Kestrel.Core/Internal/Http2/FlowControl/InputFlowControl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Diagnostics;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
public class InputFlowControl
{
private readonly int _initialWindowSize;
private readonly int _minWindowSizeIncrement;

private FlowControl _flow;
private int _pendingUpdateSize;
private bool _windowUpdatesDisabled;
private readonly object _flowLock = new object();

public InputFlowControl(uint initialWindowSize, uint minWindowSizeIncrement)
{
Debug.Assert(initialWindowSize >= minWindowSizeIncrement, "minWindowSizeIncrement is greater than the window size.");

_flow = new FlowControl(initialWindowSize);
_initialWindowSize = (int)initialWindowSize;
_minWindowSizeIncrement = (int)minWindowSizeIncrement;
}

public bool TryAdvance(int bytes)
{
lock (_flowLock)
{
// Even if the stream is aborted, the client should never send more data than was available in the
// flow-control window at the time of the abort.
if (bytes > _flow.Available)
{
throw new Http2ConnectionErrorException(CoreStrings.Http2ErrorFlowControlWindowExceeded, Http2ErrorCode.FLOW_CONTROL_ERROR);
}

if (_flow.IsAborted)
{
// This data won't be read by the app, so tell the caller to count the data as already consumed.
return false;
}

_flow.Advance(bytes);
return true;
}
}

public bool TryUpdateWindow(int bytes, out int updateSize)
{
lock (_flowLock)
{
updateSize = 0;

if (_flow.IsAborted)
{
// All data received by stream has already been returned to the connection window.
return false;
}

if (!_flow.TryUpdateWindow(bytes))
{
// We only try to update the window back to its initial size after the app consumes data.
// It shouldn't be possible for the window size to ever exceed Http2PeerSettings.MaxWindowSize.
Debug.Assert(false, $"{nameof(TryUpdateWindow)} attempted to grow window past max size.");
}

if (_windowUpdatesDisabled)
{
// Continue returning space to the connection window. The end of the stream has already
// been received, so don't send window updates for the stream window.
return true;
}

var potentialUpdateSize = _pendingUpdateSize + bytes;

if (potentialUpdateSize > _minWindowSizeIncrement)
{
_pendingUpdateSize = 0;
updateSize = potentialUpdateSize;
}
else
{
_pendingUpdateSize = potentialUpdateSize;
}

return true;
}
}

public void StopWindowUpdates()
{
lock (_flowLock)
{
_windowUpdatesDisabled = true;
}
}

public int Abort()
{
lock (_flowLock)
{
if (_flow.IsAborted)
{
return 0;
}

_flow.Abort();

// Tell caller to return connection window space consumed by this stream. Even if window updates have
// been disabled at the stream level, connection-level window updates may still be necessary.
return _initialWindowSize - _flow.Available;
}
}
}
}
74 changes: 74 additions & 0 deletions src/Kestrel.Core/Internal/Http2/FlowControl/OutputFlowControl.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Diagnostics;

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http2.FlowControl
{
public class OutputFlowControl
{
private FlowControl _flow;
private Queue<OutputFlowControlAwaitable> _awaitableQueue;

public OutputFlowControl(uint initialWindowSize)
{
_flow = new FlowControl(initialWindowSize);
}

public int Available => _flow.Available;
public bool IsAborted => _flow.IsAborted;

public OutputFlowControlAwaitable AvailabilityAwaitable
{
get
{
Debug.Assert(!_flow.IsAborted, $"({nameof(AvailabilityAwaitable)} accessed after abort.");
Debug.Assert(_flow.Available <= 0, $"({nameof(AvailabilityAwaitable)} accessed with {Available} bytes available.");

if (_awaitableQueue == null)
{
_awaitableQueue = new Queue<OutputFlowControlAwaitable>();
}

var awaitable = new OutputFlowControlAwaitable();
_awaitableQueue.Enqueue(awaitable);
return awaitable;
}
}

public void Advance(int bytes)
{
_flow.Advance(bytes);
}

// bytes can be negative when SETTINGS_INITIAL_WINDOW_SIZE decreases mid-connection.
// This can also cause Available to become negative which MUST be allowed.
// https://httpwg.org/specs/rfc7540.html#rfc.section.6.9.2
public bool TryUpdateWindow(int bytes)
{
if (_flow.TryUpdateWindow(bytes))
{
while (_flow.Available > 0 && _awaitableQueue?.Count > 0)
{
_awaitableQueue.Dequeue().Complete();
}

return true;
}

return false;
}

public void Abort()
{
// Make sure to set the aborted flag before running any continuations.
_flow.Abort();

while (_awaitableQueue?.Count > 0)
{
_awaitableQueue.Dequeue().Complete();
}
}
}
}
Loading