-
Notifications
You must be signed in to change notification settings - Fork 4.8k
avoid async overhead in ReadNextLineAsync when possible #23210
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,16 @@ private async Task<bool> TryGetNextChunk(CancellationToken cancellationToken) | |
| Debug.Assert(_chunkBytesRemaining == 0); | ||
|
|
||
| // Start of chunk, read chunk size. | ||
| ulong chunkSize = ParseHexSize(await _connection.ReadNextLineAsync(cancellationToken).ConfigureAwait(false)); | ||
| ArraySegment<byte> line; | ||
| while (!_connection.TryReadNextLine(out line)) | ||
| { | ||
| if (!await _connection.FillAsync(cancellationToken).ConfigureAwait(false)) | ||
| { | ||
| throw new IOException(SR.net_http_invalid_response); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be worth moving this throw into a helper.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or, if every time we call FillAsync we expect it to successfully get more or else it's an error, the exception can just be moved into FillAsync. |
||
| } | ||
| } | ||
|
|
||
| ulong chunkSize = ParseHexSize(line); | ||
| _chunkBytesRemaining = chunkSize; | ||
|
|
||
| if (chunkSize > 0) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -275,14 +275,33 @@ await WriteStringAsync( | |
|
|
||
| // Parse the response status line and headers | ||
| var response = new HttpResponseMessage() { RequestMessage = request, Content = new HttpConnectionContent(CancellationToken.None) }; | ||
| ParseStatusLine(await ReadNextLineAsync(cancellationToken).ConfigureAwait(false), response); | ||
|
|
||
| ArraySegment<byte> line; | ||
| while (!TryReadNextLine(out line)) | ||
| { | ||
| if (!await FillAsync(cancellationToken).ConfigureAwait(false)) | ||
| { | ||
| throw new IOException(SR.net_http_invalid_response); | ||
| } | ||
| } | ||
|
|
||
| ParseStatusLine(line, response); | ||
|
|
||
| while (true) | ||
| { | ||
| ArraySegment<byte> line = await ReadNextLineAsync(cancellationToken).ConfigureAwait(false); | ||
| while (!TryReadNextLine(out line)) | ||
| { | ||
| if (!await FillAsync(cancellationToken).ConfigureAwait(false)) | ||
| { | ||
| throw new IOException(SR.net_http_invalid_response); | ||
| } | ||
| } | ||
|
|
||
| if (line[0] == '\r') | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| ParseHeaderNameValue(line, response); | ||
| } | ||
|
|
||
|
|
@@ -639,44 +658,33 @@ private Task WriteToStreamAsync(byte[] buffer, int offset, int count, Cancellati | |
| return _stream.WriteAsync(buffer, offset, count, cancellationToken); | ||
| } | ||
|
|
||
| private async ValueTask<ArraySegment<byte>> ReadNextLineAsync(CancellationToken cancellationToken) | ||
| private bool TryReadNextLine(out ArraySegment<byte> line) | ||
| { | ||
| int searchOffset = 0; | ||
| while (true) | ||
| int remaining = _readLength - _readOffset; | ||
| int crPos = Array.IndexOf(_readBuffer, (byte)'\r', _readOffset, remaining); | ||
| if (crPos < 0) | ||
| { | ||
| int remaining = _readLength - _readOffset; | ||
| int startIndex = _readOffset + searchOffset; | ||
| int length = _readLength - startIndex; | ||
| int crPos = Array.IndexOf(_readBuffer, (byte)'\r', startIndex, length); | ||
| if (crPos < 0) | ||
| { | ||
| // Couldn't find a \r. Read more. | ||
| searchOffset = length; | ||
| await FillAsync(cancellationToken); | ||
| } | ||
| else if (crPos + 1 >= _readLength) | ||
| { | ||
| // We found a \r, but we don't have enough data buffered to read the \n. | ||
| searchOffset = length - 1; | ||
| await FillAsync(cancellationToken).ConfigureAwait(false); | ||
| } | ||
| else if (_readBuffer[crPos + 1] == '\n') | ||
| { | ||
| // We found a \r\n. Return the data up to and including it. | ||
| int lineLength = crPos - _readOffset + 2; | ||
| var result = new ArraySegment<byte>(_readBuffer, _readOffset, lineLength); | ||
| _readOffset += lineLength; | ||
| return result; | ||
| } | ||
| else | ||
| { | ||
| ThrowInvalidHttpResponse(); | ||
| } | ||
|
|
||
| if (remaining == _readLength - _readOffset) | ||
| { | ||
| throw new IOException(SR.net_http_invalid_response); | ||
| } | ||
| // Couldn't find a \r. | ||
| line = default(ArraySegment<byte>); | ||
| return false; | ||
| } | ||
| else if (crPos + 1 >= _readLength) | ||
| { | ||
| // We found a \r, but we don't have enough data buffered to read the \n. | ||
| line = default(ArraySegment<byte>); | ||
| return false; | ||
| } | ||
| else if (_readBuffer[crPos + 1] == '\n') | ||
| { | ||
| // We found a \r\n. Return the data up to and including it. | ||
| int lineLength = crPos - _readOffset + 2; | ||
| line = new ArraySegment<byte>(_readBuffer, _readOffset, lineLength); | ||
| _readOffset += lineLength; | ||
| return true; | ||
| } | ||
| else | ||
| { | ||
| throw new HttpRequestException(SR.net_http_invalid_response); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -714,7 +722,8 @@ private async Task ReadCrLfAsync(CancellationToken cancellationToken) | |
| throw new IOException(SR.net_http_invalid_response); | ||
| } | ||
|
|
||
| private Task FillAsync(CancellationToken cancellationToken) | ||
| // Returns false on EOF. | ||
| private Task<bool> FillAsync(CancellationToken cancellationToken) | ||
| { | ||
| int remaining = _readLength - _readOffset; | ||
| Debug.Assert(remaining >= 0); | ||
|
|
@@ -771,7 +780,7 @@ private Task FillAsync(CancellationToken cancellationToken) | |
| int bytesRead = t.GetAwaiter().GetResult(); | ||
| if (NetEventSource.IsEnabled) Trace($"Received {bytesRead} bytes."); | ||
| _readLength += bytesRead; | ||
| return Task.CompletedTask; | ||
| return Task.FromResult(bytesRead > 0); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FromResult doesn't return cached tasks (at least not currently). If we keep this returning private static readonly Task<bool> s_trueTask = Task.FromResult(true);
private static readonly Task<bool> s_falseTask = Task.FromResult(false);
...
return bytesRead > 0 ? s_trueTask : s_falseTask;There's an active discussion about whether FromResult should be changed to pull from the same task cache that async methods do for synchronously completing methods, and if that changes this could be undone, but for now, let's manually cache. |
||
| } | ||
| else | ||
| { | ||
|
|
@@ -783,6 +792,7 @@ private Task FillAsync(CancellationToken cancellationToken) | |
| int bytesRead = completed.GetAwaiter().GetResult(); | ||
| if (NetEventSource.IsEnabled) innerConnection.Trace($"Received {bytesRead} bytes."); | ||
| innerConnection._readLength += bytesRead; | ||
| return (bytesRead > 0); | ||
| }, this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it'll mess up the pattern here, but I'm curious if you see any throughput benefits by changing the signature to return the
ArraySegment<byte>with anout bool, or if you return a(bool, ArraySegment<byte>). I'm specifically wondering about the write barriers that may be incurred from writing to the out array (we can check the asm to verify there is one).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll give it a try. The tuple return syntax is kinda cool anyway :)