Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,27 @@

namespace Microsoft.AspNetCore.Server.Kestrel.Filter
{
public class FilteredStreamAdapter
public class FilteredStreamAdapter : IDisposable
{
private readonly string _connectionId;
private readonly Stream _filteredStream;
private readonly Stream _socketInputStream;
private readonly IKestrelTrace _log;
private readonly MemoryPool _memory;
private MemoryPoolBlock _block;
private bool _aborted = false;

public FilteredStreamAdapter(
string connectionId,
Stream filteredStream,
MemoryPool memory,
IKestrelTrace logger,
IThreadPool threadPool)
{
SocketInput = new SocketInput(memory, threadPool);
SocketOutput = new StreamSocketOutput(filteredStream, memory);
SocketOutput = new StreamSocketOutput(connectionId, filteredStream, memory, logger);

_connectionId = connectionId;
_log = logger;
_filteredStream = filteredStream;
_socketInputStream = new SocketInputStream(SocketInput);
Expand All @@ -37,16 +41,26 @@ public FilteredStreamAdapter(

public ISocketOutput SocketOutput { get; private set; }

public void ReadInput()
public Task ReadInputAsync()
{
_block = _memory.Lease();
// Use pooled block for copy
_filteredStream.CopyToAsync(_socketInputStream, _block).ContinueWith((task, state) =>
return _filteredStream.CopyToAsync(_socketInputStream, _block).ContinueWith((task, state) =>
{
((FilteredStreamAdapter)state).OnStreamClose(task);
}, this);
}

public void Abort()
{
_aborted = true;
}

public void Dispose()
{
SocketInput.Dispose();
}

private void OnStreamClose(Task copyAsyncTask)
{
_memory.Return(_block);
Expand All @@ -61,10 +75,13 @@ private void OnStreamClose(Task copyAsyncTask)
SocketInput.AbortAwaiting();
_log.LogError("FilteredStreamAdapter.CopyToAsync canceled.");
}
else if (_aborted)
{
SocketInput.AbortAwaiting();
}

try
{
_filteredStream.Dispose();
_socketInputStream.Dispose();
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
protected override void Dispose(bool disposing)
{
// Close _socketInput with a fake zero-length write that will result in a zero-length read.
_socketInput.IncomingData(null, 0, 0);
_socketInput.IncomingFin();
base.Dispose(disposing);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,57 @@ public class StreamSocketOutput : ISocketOutput
private static readonly byte[] _endChunkBytes = Encoding.ASCII.GetBytes("\r\n");
private static readonly byte[] _nullBuffer = new byte[0];

private readonly string _connectionId;
private readonly Stream _outputStream;
private readonly MemoryPool _memory;
private readonly IKestrelTrace _logger;
private MemoryPoolBlock _producingBlock;

private bool _canWrite = true;

private object _writeLock = new object();

public StreamSocketOutput(Stream outputStream, MemoryPool memory)
public StreamSocketOutput(string connectionId, Stream outputStream, MemoryPool memory, IKestrelTrace logger)
{
_connectionId = connectionId;
_outputStream = outputStream;
_memory = memory;
_logger = logger;
}

public void Write(ArraySegment<byte> buffer, bool chunk)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method needs to be changed too. We should log and swallow exceptions from _outputStream.Write and we should ignore zero length writes.

While we're at it ChunkedResponseTests should be run with the NoOpConnectionFilter like the EngineTests are.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you meant PassThroughConnectionFilter 😁

{
lock (_writeLock)
{
if (chunk && buffer.Array != null)
if (buffer.Count == 0 )
{
var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count);
_outputStream.Write(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count);
return;
}

_outputStream.Write(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count);
try
{
if (!_canWrite)
{
return;
}

if (chunk && buffer.Array != null)
{
var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count);
_outputStream.Write(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count);
}

_outputStream.Write(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count);

if (chunk && buffer.Array != null)
if (chunk && buffer.Array != null)
{
_outputStream.Write(_endChunkBytes, 0, _endChunkBytes.Length);
}
}
catch (Exception ex)
{
_outputStream.Write(_endChunkBytes, 0, _endChunkBytes.Length);
_canWrite = false;
_logger.ConnectionError(_connectionId, ex);
}
}
}
Expand All @@ -65,14 +89,38 @@ public void ProducingComplete(MemoryPoolIterator end)
var block = _producingBlock;
while (block != end.Block)
{
_outputStream.Write(block.Data.Array, block.Data.Offset, block.Data.Count);
// If we don't handle an exception from _outputStream.Write() here, we'll leak memory blocks.
if (_canWrite)
{
try
{
_outputStream.Write(block.Data.Array, block.Data.Offset, block.Data.Count);
}
catch (Exception ex)
{
_canWrite = false;
_logger.ConnectionError(_connectionId, ex);
}
}

var returnBlock = block;
block = block.Next;
returnBlock.Pool.Return(returnBlock);
}

if (_canWrite)
{
try
{
_outputStream.Write(end.Block.Array, end.Block.Data.Offset, end.Index - end.Block.Data.Offset);
}
catch (Exception ex)
{
_canWrite = false;
_logger.ConnectionError(_connectionId, ex);
}
}

_outputStream.Write(end.Block.Array, end.Block.Data.Offset, end.Index - end.Block.Data.Offset);
end.Block.Pool.Return(end.Block);
}
}
Expand Down
29 changes: 19 additions & 10 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class Connection : ConnectionContext, IConnectionControl
private Frame _frame;
private ConnectionFilterContext _filterContext;
private LibuvStream _libuvStream;
private FilteredStreamAdapter _filteredStreamAdapter;
private Task _readInputContinuation;

private readonly SocketInput _rawSocketInput;
private readonly SocketOutput _rawSocketOutput;
Expand Down Expand Up @@ -175,13 +177,20 @@ public virtual void Abort()
// Called on Libuv thread
public virtual void OnSocketClosed()
{
_rawSocketInput.Dispose();

// If a connection filter was applied there will be two SocketInputs.
// If a connection filter failed, SocketInput will be null.
if (SocketInput != null && SocketInput != _rawSocketInput)
if (_filteredStreamAdapter != null)
{
_filteredStreamAdapter.Abort();
_rawSocketInput.IncomingFin();
_readInputContinuation.ContinueWith((task, state) =>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?.

{
((Connection)state)._filterContext.Connection.Dispose();
((Connection)state)._filteredStreamAdapter.Dispose();
((Connection)state)._rawSocketInput.Dispose();
}, this);
}
else
{
SocketInput.Dispose();
_rawSocketInput.Dispose();
}

lock (_stateLock)
Expand All @@ -207,12 +216,12 @@ private void ApplyConnectionFilter()

if (_filterContext.Connection != _libuvStream)
{
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory, Log, ThreadPool);
_filteredStreamAdapter = new FilteredStreamAdapter(ConnectionId, _filterContext.Connection, Memory, Log, ThreadPool);

SocketInput = filteredStreamAdapter.SocketInput;
SocketOutput = filteredStreamAdapter.SocketOutput;
SocketInput = _filteredStreamAdapter.SocketInput;
SocketOutput = _filteredStreamAdapter.SocketOutput;

filteredStreamAdapter.ReadInput();
_readInputContinuation = _filteredStreamAdapter.ReadInputAsync();
}
else
{
Expand Down
6 changes: 6 additions & 0 deletions src/Microsoft.AspNetCore.Server.Kestrel/Http/SocketInput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ public void IncomingDeferred()
}
}

public void IncomingFin()
{
// Force a FIN
IncomingData(null, 0, 0);
}

private void Complete()
{
var awaitableState = Interlocked.Exchange(
Expand Down
Loading