Background and Motivation
Currently after calling .FlushAsync() on aPipeWriter provided by Pipe marks writing as operations complete; which makes the writing block eligible for collection when the PipeReader advances to the position of the writer (releasing the memory for an idle pipe).
However, the writer may have more to write and is only flushing so too much backlog and latency doesn't build up.
As the write flag is released; if the writer immediately wants to write more, the reader and writer move into a data race and lock contention. So the writer getting more memory with .GetMemory() needs to move through a slower lock protected path.
This additional option would tell the .FlushAsync not to release the write flag; and then calling GetMemory() after the flush would not need to go via contended locks and potentially re-rent memory from the MemoryPool that's just been released to the pool because the PipeReader won the data race.
In fact the PipeWriter would not need to call GetMemory() at all if it still have enough memory in its current block to write to as it would not have been invalidated by the .FlushAsync (by releasing the write flag it also invalidates the current memory so the full process needs to be gone through to require some memory to write to)
Additionally for non-Pipe backed PipeWriters; such as a socket-base one this flag could flow through to the socket Send passing MSG_MORE on linux send(2) flag; which acts as TCP_CORK for that send only pushing out full packets (but without the extra syscall that TCP_CORK needs)
MSG_MORE (Since Linux 2.4.4)
The caller has more data to send. This flag is used with TCP sockets to obtain the same effect as the TCP_CORK socket option (see tcp(7)), with the difference that this flag can be set on a per-call basis.
Or RIO_MSG_DEFER flag on Windows for RioSend
RIO_MSG_DEFER
The request does not need to be executed immediately. This will insert the request into the request queue, but it may or may not trigger the execution of the request.
Proposed API
public abstract partial class PipeWriter
{
/// <summary>
/// Makes bytes written available to <see cref="PipeReader"/> and runs <see cref="PipeReader.ReadAsync"/> continuation.
/// </summary>
/// <param name="isMoreData"><see cref="bool"/> indicating that more data is to be written imminently.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
public
virtual ValueTask<FlushResult> FlushAsync(bool isMoreData,
CancellationToken cancellationToken = default)
=> FlushAsync(cancellationToken);
}
Usage Examples
Current
input.Advance(bytesReceived);
var flushTask = input.FlushAsync();
if (!_waitForData)
{
// Previous buffer has been invalidated by the FlushAsync
// we need to ask for completely new one which may contend
// with the reawoken Reader from the FlushAsync
buffer = input.GetMemory(MinAllocBufferSize);
}
var result = await flushTask;
After
input.Advance(bytesReceived);
var flushTask = input.FlushAsync(isMoreData: !_waitForData);
if (!_waitForData)
{
if (buffer.Length - bytesReceived >= MinAllocBufferSize)
{
// We have enough memory, only need to slice the existing one
// by the same amount as the input.Advance
buffer = buffer.Slice(bytesReceived);
}
else
{
buffer = input.GetMemory(MinAllocBufferSize);
}
}
var result = await flushTask;
Alternative Designs
A PipeOptions level switch to say never release the block even if writing is off; this has the disadvantage of not being able to release the block when the pipe is truly reached an idle point.
Also it doesn't enable other PipeWriters such as a socket based one to use MSG_MORE; RIO_MSG_DEFER.
/cc @davidfowl @halter73
FYI @stephentoub @adamsitnik as discussed at meeting.
Background and Motivation
Currently after calling
.FlushAsync()on aPipeWriterprovided byPipemarks writing as operations complete; which makes the writing block eligible for collection when thePipeReaderadvances to the position of the writer (releasing the memory for an idle pipe).However, the writer may have more to write and is only flushing so too much backlog and latency doesn't build up.
As the write flag is released; if the writer immediately wants to write more, the reader and writer move into a data race and lock contention. So the writer getting more memory with
.GetMemory()needs to move through a slower lock protected path.This additional option would tell the
.FlushAsyncnot to release the write flag; and then callingGetMemory()after the flush would not need to go via contended locks and potentially re-rent memory from the MemoryPool that's just been released to the pool because thePipeReaderwon the data race.In fact the
PipeWriterwould not need to callGetMemory()at all if it still have enough memory in its current block to write to as it would not have been invalidated by the.FlushAsync(by releasing the write flag it also invalidates the current memory so the full process needs to be gone through to require some memory to write to)Additionally for non-
PipebackedPipeWriters; such as a socket-base one this flag could flow through to the socket Send passingMSG_MOREon linuxsend(2)flag; which acts as TCP_CORK for that send only pushing out full packets (but without the extra syscall thatTCP_CORKneeds)Or
RIO_MSG_DEFERflag on Windows forRioSendProposed API
Usage Examples
Current
After
Alternative Designs
A
PipeOptionslevel switch to say never release the block even if writing is off; this has the disadvantage of not being able to release the block when the pipe is truly reached an idle point.Also it doesn't enable other
PipeWriters such as a socket based one to useMSG_MORE;RIO_MSG_DEFER./cc @davidfowl @halter73
FYI @stephentoub @adamsitnik as discussed at meeting.