Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public StreamSocketOutput(Stream outputStream, MemoryPool2 memory)
_memory = memory;
}

public void Write(ArraySegment<byte> buffer, bool immediate, bool chunk)
public void Write(ArraySegment<byte> buffer, bool chunk)
{
lock (_writeLock)
{
Expand All @@ -47,10 +47,10 @@ public void Write(ArraySegment<byte> buffer, bool immediate, bool chunk)
}
}

public Task WriteAsync(ArraySegment<byte> buffer, bool immediate, bool chunk, CancellationToken cancellationToken)
public Task WriteAsync(ArraySegment<byte> buffer, bool chunk, CancellationToken cancellationToken)
{
// TODO: Use _outputStream.WriteAsync
Write(buffer, immediate, chunk);
Write(buffer, chunk);
return TaskUtilities.CompletedTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,10 @@ async Task<Stream> IHttpUpgradeFeature.UpgradeAsync()
}
}

await ProduceStartAndFireOnStarting(immediate: true);
await ProduceStartAndFireOnStarting();

// Force flush
await SocketOutput.WriteAsync(_emptyData);

return DuplexStream;
}
Expand Down
65 changes: 29 additions & 36 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -392,19 +392,19 @@ protected async Task FireOnCompleted()

public void Flush()
{
ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult();
SocketOutput.Write(_emptyData, immediate: true);
ProduceStartAndFireOnStarting().GetAwaiter().GetResult();
SocketOutput.Write(_emptyData);
}

public async Task FlushAsync(CancellationToken cancellationToken)
{
await ProduceStartAndFireOnStarting(immediate: false);
await SocketOutput.WriteAsync(_emptyData, immediate: true, cancellationToken: cancellationToken);
await ProduceStartAndFireOnStarting();
await SocketOutput.WriteAsync(_emptyData, cancellationToken: cancellationToken);
}

public void Write(ArraySegment<byte> data)
{
ProduceStartAndFireOnStarting(immediate: false).GetAwaiter().GetResult();
ProduceStartAndFireOnStarting().GetAwaiter().GetResult();

if (_autoChunk)
{
Expand All @@ -416,7 +416,7 @@ public void Write(ArraySegment<byte> data)
}
else
{
SocketOutput.Write(data, immediate: true);
SocketOutput.Write(data);
}
}

Expand All @@ -437,13 +437,13 @@ public Task WriteAsync(ArraySegment<byte> data, CancellationToken cancellationTo
}
else
{
return SocketOutput.WriteAsync(data, immediate: true, cancellationToken: cancellationToken);
return SocketOutput.WriteAsync(data, cancellationToken: cancellationToken);
}
}

public async Task WriteAsyncAwaited(ArraySegment<byte> data, CancellationToken cancellationToken)
{
await ProduceStartAndFireOnStarting(immediate: false);
await ProduceStartAndFireOnStarting();

if (_autoChunk)
{
Expand All @@ -455,23 +455,23 @@ public async Task WriteAsyncAwaited(ArraySegment<byte> data, CancellationToken c
}
else
{
await SocketOutput.WriteAsync(data, immediate: true, cancellationToken: cancellationToken);
await SocketOutput.WriteAsync(data, cancellationToken: cancellationToken);
}
}

private void WriteChunked(ArraySegment<byte> data)
{
SocketOutput.Write(data, immediate: false, chunk: true);
SocketOutput.Write(data, chunk: true);
}

private Task WriteChunkedAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{
return SocketOutput.WriteAsync(data, immediate: false, chunk: true, cancellationToken: cancellationToken);
return SocketOutput.WriteAsync(data, chunk: true, cancellationToken: cancellationToken);
}

private Task WriteChunkedResponseSuffix()
{
return SocketOutput.WriteAsync(_endChunkedResponseBytes, immediate: true);
return SocketOutput.WriteAsync(_endChunkedResponseBytes);
}

private static ArraySegment<byte> CreateAsciiByteArraySegment(string text)
Expand All @@ -493,13 +493,13 @@ public void ProduceContinue()
}
}

public Task ProduceStartAndFireOnStarting(bool immediate = true)
public Task ProduceStartAndFireOnStarting()
{
if (_responseStarted) return TaskUtilities.CompletedTask;

if (_onStarting != null)
{
return FireOnStartingProduceStart(immediate: immediate);
return ProduceStartAndFireOnStartingAwaited();
}

if (_applicationException != null)
Expand All @@ -509,10 +509,12 @@ public Task ProduceStartAndFireOnStarting(bool immediate = true)
_applicationException);
}

return ProduceStart(immediate, appCompleted: false);
ProduceStart(appCompleted: false);

return TaskUtilities.CompletedTask;
}

private async Task FireOnStartingProduceStart(bool immediate)
private async Task ProduceStartAndFireOnStartingAwaited()
{
await FireOnStarting();

Expand All @@ -523,17 +525,17 @@ private async Task FireOnStartingProduceStart(bool immediate)
_applicationException);
}

await ProduceStart(immediate, appCompleted: false);
ProduceStart(appCompleted: false);
}

private Task ProduceStart(bool immediate, bool appCompleted)
private void ProduceStart(bool appCompleted)
{
if (_responseStarted) return TaskUtilities.CompletedTask;
if (_responseStarted) return;
_responseStarted = true;

var statusBytes = ReasonPhrases.ToStatusBytes(StatusCode, ReasonPhrase);

return CreateResponseHeader(statusBytes, appCompleted, immediate);
CreateResponseHeader(statusBytes, appCompleted);
}

protected Task ProduceEnd()
Expand All @@ -556,7 +558,6 @@ protected Task ProduceEnd()
}
}


if (!_responseStarted)
{
return ProduceEndAwaited();
Expand All @@ -567,7 +568,10 @@ protected Task ProduceEnd()

private async Task ProduceEndAwaited()
{
await ProduceStart(immediate: true, appCompleted: true);
ProduceStart(appCompleted: true);

// Force flush
await SocketOutput.WriteAsync(_emptyData);

await WriteSuffix();
}
Expand Down Expand Up @@ -599,13 +603,11 @@ private async Task WriteAutoChunkSuffixAwaited()
}
}

private Task CreateResponseHeader(
private void CreateResponseHeader(
byte[] statusBytes,
bool appCompleted,
bool immediate)
bool appCompleted)
{
var begin = SocketOutput.ProducingStart();
var end = begin;
var end = SocketOutput.ProducingStart();
if (_keepAlive)
{
foreach (var connectionValue in _responseHeaders.HeaderConnection)
Expand Down Expand Up @@ -659,15 +661,6 @@ private Task CreateResponseHeader(
end.CopyFrom(_bytesEndHeaders, 0, _bytesEndHeaders.Length);

SocketOutput.ProducingComplete(end);

if (immediate)
{
return SocketOutput.WriteAsync(default(ArraySegment<byte>), immediate: true);
}
else
{
return TaskUtilities.CompletedTask;
}
}

protected bool TakeStartLine(SocketInput input)
Expand Down
4 changes: 2 additions & 2 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/ISocketOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Http
/// </summary>
public interface ISocketOutput
{
void Write(ArraySegment<byte> buffer, bool immediate = true, bool chunk = false);
Task WriteAsync(ArraySegment<byte> buffer, bool immediate = true, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken));
void Write(ArraySegment<byte> buffer, bool chunk = false);
Task WriteAsync(ArraySegment<byte> buffer, bool chunk = false, CancellationToken cancellationToken = default(CancellationToken));

/// <summary>
/// Returns an iterator pointing to the tail of the response buffer. Response data can be appended
Expand Down
21 changes: 6 additions & 15 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public SocketOutput(

public Task WriteAsync(
ArraySegment<byte> buffer,
bool immediate = true,
bool chunk = false,
bool socketShutdownSend = false,
bool socketDisconnect = false,
Expand Down Expand Up @@ -130,13 +129,7 @@ public Task WriteAsync(
_nextWriteContext.SocketDisconnect = true;
}

if (!immediate)
{
// immediate==false calls always return complete tasks, because there is guaranteed
// to be a subsequent immediate==true call which will go down one of the previous code-paths
_numBytesPreCompleted += buffer.Count;
}
else if (_lastWriteError == null &&
if (_lastWriteError == null &&
_tasksPending.Count == 0 &&
_numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted)
{
Expand All @@ -155,7 +148,7 @@ public Task WriteAsync(
});
}

if (!_writePending && immediate)
if (!_writePending)
{
_writePending = true;
scheduleWrite = true;
Expand All @@ -177,13 +170,11 @@ public void End(ProduceEndType endType)
{
case ProduceEndType.SocketShutdownSend:
WriteAsync(default(ArraySegment<byte>),
immediate: true,
socketShutdownSend: true,
socketDisconnect: false);
break;
case ProduceEndType.SocketDisconnect:
WriteAsync(default(ArraySegment<byte>),
immediate: true,
socketShutdownSend: false,
socketDisconnect: true);
break;
Expand Down Expand Up @@ -391,14 +382,14 @@ private void PoolWriteContext(WriteContext writeContext)
}
}

void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate, bool chunk)
void ISocketOutput.Write(ArraySegment<byte> buffer, bool chunk)
{
WriteAsync(buffer, immediate, chunk, isSync: true).GetAwaiter().GetResult();
WriteAsync(buffer, chunk, isSync: true).GetAwaiter().GetResult();
}

Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, bool chunk, CancellationToken cancellationToken)
Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool chunk, CancellationToken cancellationToken)
{
return WriteAsync(buffer, immediate, chunk);
return WriteAsync(buffer, chunk);
}

private static void BytesBetween(MemoryPoolIterator2 start, MemoryPoolIterator2 end, out int bytes, out int buffers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Testing.xunit;
using Xunit;
Expand Down Expand Up @@ -166,6 +167,50 @@ await connection.ReceiveEnd(
}
}
}

[ConditionalFact]
[FrameworkSkipCondition(RuntimeFrameworks.Mono, SkipReason = "Test hangs after execution on Mono.")]
public async Task WritesAreFlushedPriorToResponseCompletion()
{
var flushWh = new ManualResetEventSlim();

using (var server = new TestServer(async httpContext =>
{
var response = httpContext.Response;
response.Headers.Clear();
await response.Body.WriteAsync(Encoding.ASCII.GetBytes("Hello "), 0, 6);

// Don't complete response until client has received the first chunk.
flushWh.Wait();

await response.Body.WriteAsync(Encoding.ASCII.GetBytes("World!"), 0, 6);
}))
{
using (var connection = new TestConnection())
{
await connection.SendEnd(
"GET / HTTP/1.1",
"",
"");
await connection.Receive(
"HTTP/1.1 200 OK",
"Transfer-Encoding: chunked",
"",
"6",
"Hello ",
"");

flushWh.Set();

await connection.ReceiveEnd(
"6",
"World!",
"0",
"",
"");
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,6 @@ public async Task FailedWritesResultInAbortedRequest(ServiceContext testContext)
connectionCloseWh.Wait();

response.Headers.Clear();
response.Headers["Content-Length"] = new[] { "5" };

try
{
Expand Down
Loading