Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;

namespace Microsoft.AspNetCore.Server.Kestrel.Http
Expand Down Expand Up @@ -314,7 +313,11 @@ async Task<Stream> IHttpUpgradeFeature.UpgradeAsync()

await FlushAsync(default(CancellationToken));

return DuplexStream;
if (_frameState.TransitionToState(RequestState.UpgradedRequest) == RequestState.UpgradedRequest)
{
return DuplexStream;
}
throw new IOException("Failed to upgrade request");
}

IEnumerator<KeyValuePair<Type, object>> IEnumerable<KeyValuePair<Type, object>>.GetEnumerator() => FastEnumerable().GetEnumerator();
Expand Down
82 changes: 56 additions & 26 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ public abstract partial class Frame : FrameContext, IFrameControl

protected List<KeyValuePair<Func<object, Task>, object>> _onCompleted;

private bool _requestProcessingStarted;
protected FrameState _frameState;
private Task _requestProcessingTask;
protected volatile bool _requestProcessingStopping; // volatile, see: https://msdn.microsoft.com/en-us/library/x13ttww7.aspx
protected int _requestAborted;
protected CancellationTokenSource _abortedCts;
protected CancellationToken? _manuallySetRequestAbortToken;

Expand Down Expand Up @@ -92,13 +90,15 @@ public Frame(ConnectionContext context,
_localEndPoint = localEndPoint;
_prepareRequest = prepareRequest;
_pathBase = context.ServerAddress.PathBase;
if (ReuseStreams)
if (Settings.ReuseStreams)
{
_requestBody = new FrameRequestStream();
_responseBody = new FrameResponseStream(this);
_duplexStream = new FrameDuplexStream(_requestBody, _responseBody);
}

_frameState = new FrameState(this, context.Settings);

FrameControl = this;
Reset();
}
Expand Down Expand Up @@ -167,7 +167,7 @@ public CancellationToken RequestAborted
var cts = _abortedCts;
return
cts != null ? cts.Token :
(Volatile.Read(ref _requestAborted) == 1) ? new CancellationToken(true) :
_frameState.CurrentState == RequestState.Aborted ? new CancellationToken(true) :
RequestAbortedSource.Token;
}
set
Expand All @@ -185,20 +185,26 @@ private CancellationTokenSource RequestAbortedSource
// Get the abort token, lazily-initializing it if necessary.
// Make sure it's canceled if an abort request already came in.
var cts = LazyInitializer.EnsureInitialized(ref _abortedCts, () => new CancellationTokenSource());
if (Volatile.Read(ref _requestAborted) == 1)
if (_frameState.CurrentState == RequestState.Aborted)
{
cts.Cancel();
}
return cts;
}
}

public bool HasResponseStarted
{
get { return _responseStarted; }
}

public void Reset()
public bool Reset()
{
if (_frameState.TransitionToState(RequestState.Waiting) != RequestState.Waiting)
{
return false;
}

_onStarting = null;
_onCompleted = null;

Expand Down Expand Up @@ -237,6 +243,8 @@ public void Reset()

_manuallySetRequestAbortToken = null;
_abortedCts = null;

return true;
}

public void ResetResponseHeaders()
Expand All @@ -255,9 +263,8 @@ public void ResetResponseHeaders()
/// </summary>
public void Start()
{
if (!_requestProcessingStarted)
if (_frameState.TransitionToState(RequestState.Waiting) == RequestState.Waiting)
{
_requestProcessingStarted = true;
_requestProcessingTask =
Task.Factory.StartNew(
(o) => ((Frame)o).RequestProcessingAsync(),
Expand All @@ -276,10 +283,8 @@ public void Start()
/// </summary>
public Task Stop()
{
if (!_requestProcessingStopping)
{
_requestProcessingStopping = true;
}
_frameState.TransitionToState(RequestState.Stopped);

return _requestProcessingTask ?? TaskUtilities.CompletedTask;
}

Expand All @@ -288,10 +293,8 @@ public Task Stop()
/// </summary>
public void Abort()
{
if (Interlocked.CompareExchange(ref _requestAborted, 1, 0) == 0)
if (_frameState.TransitionToState(RequestState.Aborted) != RequestState.Aborted)
{
_requestProcessingStopping = true;

_requestBody?.Abort();
_responseBody?.Abort();

Expand All @@ -304,16 +307,18 @@ public void Abort()
{
Log.LogError(0, ex, "Abort");
}

try
{
RequestAbortedSource.Cancel();
}
catch (Exception ex)
finally
{
Log.LogError(0, ex, "Abort");
try
{
RequestAbortedSource.Cancel();
}
catch (Exception ex)
{
Log.LogError(0, ex, "Abort");
}
_abortedCts = null;
}
_abortedCts = null;
}
}

Expand Down Expand Up @@ -552,7 +557,7 @@ protected Task ProduceEnd()
if (_responseStarted)
{
// We can no longer respond with a 500, so we simply close the connection.
_requestProcessingStopping = true;
_frameState.TransitionToState(RequestState.Stopped);
return TaskUtilities.CompletedTask;
}
else
Expand All @@ -567,6 +572,20 @@ protected Task ProduceEnd()

if (!_responseStarted)
{
var frameState = _frameState.CurrentState;
if (frameState > RequestState.Stopping)
{
if (frameState < RequestState.Stopped)
{
// State is status code
StatusCode = frameState;
}
else
{
StatusCode = 500;
}
ReasonPhrase = null;
}
return ProduceEndAwaited();
}

Expand Down Expand Up @@ -678,12 +697,23 @@ protected bool TakeStartLine(SocketInput input)
{
string method;
var begin = scan;
if (!begin.GetKnownMethod(ref scan,out method))
if (begin.GetKnownMethod(ref scan, out method))
{
if (_frameState.TransitionToState(RequestState.ReadingHeaders) != RequestState.ReadingHeaders)
{
return false;
}
}
else
{
if (scan.Seek(ref _vectorSpaces) == -1)
{
return false;
}
if (_frameState.TransitionToState(RequestState.ReadingHeaders) != RequestState.ReadingHeaders)
{
return false;
}
method = begin.GetAsciiString(scan);
scan.Take();
}
Expand Down
25 changes: 15 additions & 10 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/FrameOfT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public override async Task RequestProcessingAsync()
{
try
{
while (!_requestProcessingStopping)
while (_frameState.CurrentState == RequestState.Waiting)
{
while (!_requestProcessingStopping && !TakeStartLine(SocketInput))
while (_frameState.CurrentState < RequestState.Stopping && !TakeStartLine(SocketInput))
{
if (SocketInput.RemoteIntakeFin)
{
Expand All @@ -52,7 +52,7 @@ public override async Task RequestProcessingAsync()
await SocketInput;
}

while (!_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders))
while (_frameState.CurrentState < RequestState.Stopping && !TakeMessageHeaders(SocketInput, _requestHeaders))
{
if (SocketInput.RemoteIntakeFin)
{
Expand All @@ -61,13 +61,13 @@ public override async Task RequestProcessingAsync()
await SocketInput;
}

if (!_requestProcessingStopping)
if (_frameState.TransitionToState(RequestState.ExecutingRequest) == RequestState.ExecutingRequest)
{
var messageBody = MessageBody.For(HttpVersion, _requestHeaders, this);
_keepAlive = messageBody.RequestKeepAlive;

// _duplexStream may be null if flag switched while running
if (!ReuseStreams || _duplexStream == null)
if (!Settings.ReuseStreams || _duplexStream == null)
{
_requestBody = new FrameRequestStream();
_responseBody = new FrameResponseStream(this);
Expand Down Expand Up @@ -111,8 +111,8 @@ public override async Task RequestProcessingAsync()

_application.DisposeContext(context, _applicationException);

// If _requestAbort is set, the connection has already been closed.
if (Volatile.Read(ref _requestAborted) == 0)
// If Aborted, the connection has already been closed.
if (_frameState.CurrentState != RequestState.Aborted)
{
_responseBody.ResumeAcceptingWrites();
await ProduceEnd();
Expand All @@ -135,7 +135,10 @@ public override async Task RequestProcessingAsync()
}
}

Reset();
if (!Reset())
{
return;
}
}
}
catch (Exception ex)
Expand All @@ -146,10 +149,12 @@ public override async Task RequestProcessingAsync()
{
try
{
var frameState = _frameState.CurrentState;
_frameState.Dispose();
_abortedCts = null;

// If _requestAborted is set, the connection has already been closed.
if (Volatile.Read(ref _requestAborted) == 0)
// If Aborted, the connection has already been closed.
if (frameState != RequestState.Aborted)
{
// Inform client no more data will ever arrive
ConnectionControl.End(ProduceEndType.SocketShutdownSend);
Expand Down
Loading