Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ internal delegate uint StreamSendDelegate(
QUIC_SEND_FLAGS flags,
IntPtr clientSendContext);

internal delegate uint StreamReceiveCompleteDelegate(
internal delegate void StreamReceiveCompleteDelegate(
SafeMsQuicStreamHandle stream,
ulong bufferLength);

Expand Down Expand Up @@ -1376,9 +1376,8 @@ internal uint StreamSend(SafeMsQuicStreamHandle stream, QuicBuffer* buffers, uin

return __retVal;
}
internal uint StreamReceiveComplete(SafeMsQuicStreamHandle stream, ulong bufferLength)
internal void StreamReceiveComplete(SafeMsQuicStreamHandle stream, ulong bufferLength)
{
uint __retVal;
//
// Setup
//
Expand All @@ -1390,7 +1389,7 @@ internal uint StreamReceiveComplete(SafeMsQuicStreamHandle stream, ulong bufferL
//
stream.DangerousAddRef(ref stream__addRefd);
IntPtr __stream_gen_native = stream.DangerousGetHandle();
__retVal = ((delegate* unmanaged[Cdecl]<IntPtr, ulong, uint>)_functionPointer)(__stream_gen_native, bufferLength);
((delegate* unmanaged[Cdecl]<IntPtr, ulong, void>)_functionPointer)(__stream_gen_native, bufferLength);
}
finally
{
Expand All @@ -1400,8 +1399,6 @@ internal uint StreamReceiveComplete(SafeMsQuicStreamHandle stream, ulong bufferL
if (stream__addRefd)
stream.DangerousRelease();
}

return __retVal;
}
internal uint StreamReceiveSetEnabled(SafeMsQuicStreamHandle stream, bool enabled)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
long abortError;
bool preCanceled = false;

int bytesRead = -1;
bool reenableReceive = false;
lock (_state)
{
initialReadState = _state.ReadState;
Expand Down Expand Up @@ -493,22 +495,32 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
{
_state.ReadState = ReadState.None;

int taken = CopyMsQuicBuffersToUserBuffer(_state.ReceiveQuicBuffers.AsSpan(0, _state.ReceiveQuicBuffersCount), destination.Span);
ReceiveComplete(taken);
bytesRead = CopyMsQuicBuffersToUserBuffer(_state.ReceiveQuicBuffers.AsSpan(0, _state.ReceiveQuicBuffersCount), destination.Span);

if (taken != _state.ReceiveQuicBuffersTotalBytes)
if (bytesRead != _state.ReceiveQuicBuffersTotalBytes)
{
// Need to re-enable receives because MsQuic will pause them when we don't consume the entire buffer.
EnableReceive();
reenableReceive = true;
}
else if (_state.ReceiveIsFinal)
{
// This was a final message and we've consumed everything. We can complete the state without waiting for PEER_SEND_SHUTDOWN
_state.ReadState = ReadState.ReadsCompleted;
}
}
}

// methods below need to be called outside of the lock
if (bytesRead > -1)
{
ReceiveComplete(bytesRead);

return new ValueTask<int>(taken);
if (reenableReceive)
{
EnableReceive();
}

return new ValueTask<int>(bytesRead);
}

// All success scenarios returned at this point. Failure scenarios below:
Expand Down Expand Up @@ -859,6 +871,7 @@ private void Dispose(bool disposing)

private void EnableReceive()
{
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
uint status = MsQuicApi.Api.StreamReceiveSetEnabledDelegate(_state.Handle, enabled: true);
QuicExceptionHelpers.ThrowIfFailed(status, "StreamReceiveSetEnabled failed.");
}
Expand Down Expand Up @@ -1475,8 +1488,8 @@ private unsafe ValueTask SendReadOnlyMemoryListAsync(

private void ReceiveComplete(int bufferLength)
{
uint status = MsQuicApi.Api.StreamReceiveCompleteDelegate(_state.Handle, (ulong)bufferLength);
QuicExceptionHelpers.ThrowIfFailed(status, "Could not complete receive call.");
Debug.Assert(!Monitor.IsEntered(_state), "!Monitor.IsEntered(_state)");
MsQuicApi.Api.StreamReceiveCompleteDelegate(_state.Handle, (ulong)bufferLength);
}

// This can fail if the stream isn't started.
Expand Down