Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Microsoft.AspNetCore.Server.Kestrel/Http/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void Start()
}
}

public void Abort()
public virtual void Abort()
{
if (_frame != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,7 @@ async Task<Stream> IHttpUpgradeFeature.UpgradeAsync()
}
}

await ProduceStartAndFireOnStarting();

// Force flush
await SocketOutput.WriteAsync(_emptyData);
await FlushAsync(CancellationToken.None);

return DuplexStream;
}
Expand Down
45 changes: 26 additions & 19 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract partial class Frame : FrameContext, IFrameControl
private bool _requestProcessingStarted;
private Task _requestProcessingTask;
protected volatile bool _requestProcessingStopping; // volatile, see: https://msdn.microsoft.com/en-us/library/x13ttww7.aspx
protected volatile bool _requestAborted;
protected int _requestAborted;
protected CancellationTokenSource _abortedCts;
protected CancellationToken? _manuallySetRequestAbortToken;

Expand Down Expand Up @@ -167,7 +167,7 @@ public CancellationToken RequestAborted
var cts = _abortedCts;
return
cts != null ? cts.Token :
_requestAborted ? new CancellationToken(true) :
(Volatile.Read(ref _requestAborted) == 1) ? new CancellationToken(true) :
RequestAbortedSource.Token;
}
set
Expand All @@ -185,7 +185,7 @@ private CancellationTokenSource RequestAbortedSource
// Get the abort token, lazily-initializing it if necessary.
// Make sure it's canceled if an abort request already came in.
var cts = LazyInitializer.EnsureInitialized(ref _abortedCts, () => new CancellationTokenSource());
if (_requestAborted)
if (Volatile.Read(ref _requestAborted) == 1)
{
cts.Cancel();
}
Expand Down Expand Up @@ -288,24 +288,31 @@ public Task Stop()
/// </summary>
public void Abort()
{
_requestProcessingStopping = true;
_requestAborted = true;
if (Interlocked.CompareExchange(ref _requestAborted, 1, 0) == 0)
{
_requestProcessingStopping = true;

_requestBody?.Abort();
_responseBody?.Abort();
_requestBody?.Abort();
_responseBody?.Abort();

try
{
ConnectionControl.End(ProduceEndType.SocketDisconnect);
SocketInput.AbortAwaiting();
RequestAbortedSource.Cancel();
}
catch (Exception ex)
{
Log.LogError("Abort", ex);
}
finally
{
try
{
ConnectionControl.End(ProduceEndType.SocketDisconnect);
SocketInput.AbortAwaiting();
}
catch (Exception ex)
{
Log.LogError("Abort", ex);
}

try
{
RequestAbortedSource.Cancel();
}
catch (Exception ex)
{
Log.LogError("Abort", ex);
}
_abortedCts = null;
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/FrameOfT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http.Features;
Expand Down Expand Up @@ -111,7 +112,7 @@ public override async Task RequestProcessingAsync()
_application.DisposeContext(context, _applicationException);

// If _requestAbort is set, the connection has already been closed.
if (!_requestAborted)
if (Volatile.Read(ref _requestAborted) == 0)
{
_responseBody.ResumeAcceptingWrites();
await ProduceEnd();
Expand Down Expand Up @@ -148,7 +149,7 @@ public override async Task RequestProcessingAsync()
_abortedCts = null;

// If _requestAborted is set, the connection has already been closed.
if (!_requestAborted)
if (Volatile.Read(ref _requestAborted) == 0)
{
// Inform client no more data will ever arrive
ConnectionControl.End(ProduceEndType.SocketShutdownSend);
Expand Down
31 changes: 19 additions & 12 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/FrameRequestStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;

namespace Microsoft.AspNetCore.Server.Kestrel.Http
{
Expand Down Expand Up @@ -51,16 +52,14 @@ public override void SetLength(long value)

public override int Read(byte[] buffer, int offset, int count)
{
ValidateState();

// ValueTask uses .GetAwaiter().GetResult() if necessary
return ReadAsync(buffer, offset, count).Result;
}

#if NET451
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
ValidateState();
ValidateState(CancellationToken.None);

var task = ReadAsync(buffer, offset, count, CancellationToken.None, state);
if (callback != null)
Expand All @@ -77,7 +76,7 @@ public override int EndRead(IAsyncResult asyncResult)

private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
ValidateState();
ValidateState(cancellationToken);

var tcs = new TaskCompletionSource<int>(state);
var task = _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken);
Expand All @@ -103,10 +102,13 @@ private Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationTo

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValidateState();

// Needs .AsTask to match Stream's Async method return types
return _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken).AsTask();
var task = ValidateState(cancellationToken);
if (task == null)
{
// Needs .AsTask to match Stream's Async method return types
return _body.ReadAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken).AsTask();
}
return task;
}

public override void Write(byte[] buffer, int offset, int count)
Expand Down Expand Up @@ -149,24 +151,29 @@ public void StopAcceptingReads()
public void Abort()
{
// We don't want to throw an ODE until the app func actually completes.
// If the request is aborted, we throw an IOException instead.
// If the request is aborted, we throw an TaskCanceledException instead.
if (_state != FrameStreamState.Closed)
{
_state = FrameStreamState.Aborted;
}
}

private void ValidateState()
private Task<int> ValidateState(CancellationToken cancellationToken)
{
switch (_state)
{
case FrameStreamState.Open:
return;
if (cancellationToken.IsCancellationRequested)
{
return TaskUtilities.GetCancelledZeroTask();
}
break;
case FrameStreamState.Closed:
throw new ObjectDisposedException(nameof(FrameRequestStream));
case FrameStreamState.Aborted:
throw new IOException("The request has been aborted.");
return TaskUtilities.GetCancelledZeroTask();
}
return null;
}
}
}
40 changes: 28 additions & 12 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/FrameResponseStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;

namespace Microsoft.AspNetCore.Server.Kestrel.Http
{
Expand Down Expand Up @@ -37,16 +38,19 @@ public override long Length

public override void Flush()
{
ValidateState();
ValidateState(CancellationToken.None);

_context.FrameControl.Flush();
}

public override Task FlushAsync(CancellationToken cancellationToken)
{
ValidateState();

return _context.FrameControl.FlushAsync(cancellationToken);
var task = ValidateState(cancellationToken);
if (task == null)
{
return _context.FrameControl.FlushAsync(cancellationToken);
}
return task;
}

public override long Seek(long offset, SeekOrigin origin)
Expand All @@ -66,16 +70,19 @@ public override int Read(byte[] buffer, int offset, int count)

public override void Write(byte[] buffer, int offset, int count)
{
ValidateState();
ValidateState(CancellationToken.None);

_context.FrameControl.Write(new ArraySegment<byte>(buffer, offset, count));
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValidateState();

return _context.FrameControl.WriteAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken);
var task = ValidateState(cancellationToken);
if (task == null)
{
return _context.FrameControl.WriteAsync(new ArraySegment<byte>(buffer, offset, count), cancellationToken);
}
return task;
}

public Stream StartAcceptingWrites()
Expand Down Expand Up @@ -112,24 +119,33 @@ public void StopAcceptingWrites()
public void Abort()
{
// We don't want to throw an ODE until the app func actually completes.
// If the request is aborted, we throw an IOException instead.
if (_state != FrameStreamState.Closed)
{
_state = FrameStreamState.Aborted;
}
}

private void ValidateState()
private Task ValidateState(CancellationToken cancellationToken)
{
switch (_state)
{
case FrameStreamState.Open:
return;
if (cancellationToken.IsCancellationRequested)
{
return TaskUtilities.GetCancelledTask(cancellationToken);
}
break;
case FrameStreamState.Closed:
throw new ObjectDisposedException(nameof(FrameResponseStream));
case FrameStreamState.Aborted:
throw new IOException("The request has been aborted.");
if (cancellationToken.IsCancellationRequested)
{
// Aborted state only throws on write if cancellationToken requests it
return TaskUtilities.GetCancelledTask(cancellationToken);
}
break;
}
return null;
}
}
}
2 changes: 0 additions & 2 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/MessageBody.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Primitives;

namespace Microsoft.AspNetCore.Server.Kestrel.Http
{
Expand Down
7 changes: 6 additions & 1 deletion src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketInput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Server.Kestrel.Infrastructure;

namespace Microsoft.AspNetCore.Server.Kestrel.Http
Expand Down Expand Up @@ -184,7 +185,7 @@ public void ConsumingComplete(

public void AbortAwaiting()
{
_awaitableError = new ObjectDisposedException(nameof(SocketInput), "The request was aborted");
_awaitableError = new TaskCanceledException("The request was aborted");

Complete();
}
Expand Down Expand Up @@ -238,6 +239,10 @@ public void GetResult()
var error = _awaitableError;
if (error != null)
{
if (error is TaskCanceledException || error is InvalidOperationException)
{
throw error;
}
throw new IOException(error.Message, error);
}
}
Expand Down
Loading