Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Make MemoryBufferWriter a Stream#1907

Merged
davidfowl merged 5 commits into
devfrom
davidfowl/memory-buffer-writer-stream
Apr 8, 2018
Merged

Make MemoryBufferWriter a Stream#1907
davidfowl merged 5 commits into
devfrom
davidfowl/memory-buffer-writer-stream

Conversation

@davidfowl
Copy link
Copy Markdown
Member

@davidfowl davidfowl commented Apr 8, 2018

  • Get rid of LimitArrayPoolWriteStream and use MemoryBufferWriter in its place in the MessagePackProtocol implementation.
  • Added tests for MemoryPoolBufferWriter and fixed a bug in CopyToAsync
  • Added CopyTo(IBufferWriter<byte>)
  • Changed MemoryBufferWriter to fill the underlying arrays that back segments, the segment size is now a minimum.

Latest Results

                Method |          Input | HubProtocol |        Mean |      Error |     StdDev |        Op/s |  Gen 0 | Allocated |
---------------------- |--------------- |------------ |------------:|-----------:|-----------:|------------:|-------:|----------:|
    WriteSingleMessage |   FewArguments |     MsgPack |  1,353.7 ns |  19.034 ns |  17.804 ns |   738,701.2 | 0.0134 |     472 B |
 OldWriteSingleMessage |   FewArguments |     MsgPack |  1,278.2 ns |  19.146 ns |  16.973 ns |   782,370.4 | 0.0153 |     520 B |
    WriteSingleMessage | LargeArguments |     MsgPack | 17,249.9 ns | 108.451 ns |  96.139 ns |    57,971.5 | 2.0447 |   61848 B |
 OldWriteSingleMessage | LargeArguments |     MsgPack | 16,273.1 ns | 185.642 ns | 164.566 ns |    61,450.9 | 2.0447 |   61896 B |
    WriteSingleMessage |  ManyArguments |     MsgPack |  2,494.1 ns |  29.492 ns |  26.144 ns |   400,941.9 | 0.0229 |     744 B |
 OldWriteSingleMessage |  ManyArguments |     MsgPack |  2,407.0 ns |  30.682 ns |  27.199 ns |   415,459.0 | 0.0229 |     792 B |
    WriteSingleMessage |    NoArguments |     MsgPack |    552.6 ns |   6.188 ns |   5.485 ns | 1,809,472.5 | 0.0067 |     224 B |
 OldWriteSingleMessage |    NoArguments |     MsgPack |    536.0 ns |   6.296 ns |   5.581 ns | 1,865,671.5 | 0.0086 |     272 B |

Broadcast json, 1000 connections

Before

            Method | Connections | Protocol |     Mean |     Error |    StdDev |   Median |  Op/s |  Gen 0 | Allocated |
--------------- |------------ |--------- |---------:|----------:|----------:|---------:|------:|-------:|----------:|
 SendAsyncGroup |        1000 |     json | 1.618 ms | 0.0337 ms | 0.0839 ms | 1.586 ms | 618.1 | 1.9531 |  97.78 KB |
   SendAsyncAll |        1000 |     json | 1.465 ms | 0.0117 ms | 0.0109 ms | 1.463 ms | 682.8 | 1.9531 |  65.72 KB |

After

         Method | Connections | Protocol |     Mean |     Error |    StdDev |  Op/s |  Gen 0 | Allocated |
--------------- |------------ |--------- |---------:|----------:|----------:|------:|-------:|----------:|
 SendAsyncGroup |        1000 |     json | 1.541 ms | 0.0153 ms | 0.0143 ms | 648.8 | 1.9531 |  98.67 KB |
   SendAsyncAll |        1000 |     json | 1.502 ms | 0.0176 ms | 0.0147 ms | 666.0 | 1.9531 |  77.62 KB |

Previous runs

Before

             Method |          Input | HubProtocol |        Mean |      Error |     StdDev |        Op/s |  Gen 0 | Allocated |
------------------- |--------------- |------------ |------------:|-----------:|-----------:|------------:|-------:|----------:|
 WriteSingleMessage |   FewArguments |     MsgPack |  1,347.5 ns |  25.898 ns |  24.225 ns |   742,109.1 | 0.0153 |     520 B |
 WriteSingleMessage | LargeArguments |     MsgPack | 17,434.4 ns | 342.015 ns | 432.539 ns |    57,357.9 | 2.0447 |   61896 B |
 WriteSingleMessage |  ManyArguments |     MsgPack |  2,488.1 ns |  42.467 ns |  39.724 ns |   401,920.6 | 0.0229 |     792 B |
 WriteSingleMessage |    NoArguments |     MsgPack |    548.6 ns |   5.932 ns |   5.549 ns | 1,822,692.4 | 0.0086 |     272 B |

After

             Method |          Input | HubProtocol |        Mean |      Error |     StdDev |        Op/s |  Gen 0 | Allocated |
------------------- |--------------- |------------ |------------:|-----------:|-----------:|------------:|-------:|----------:|
 WriteSingleMessage |   FewArguments |     MsgPack |  1,627.8 ns |  25.531 ns |  23.882 ns |   614,328.8 | 0.0134 |     472 B |
 WriteSingleMessage | LargeArguments |     MsgPack | 20,891.9 ns | 302.379 ns | 268.051 ns |    47,865.4 | 2.0752 |   61848 B |
 WriteSingleMessage |  ManyArguments |     MsgPack |  3,008.5 ns |  39.299 ns |  34.837 ns |   332,394.7 | 0.0229 |     744 B |
 WriteSingleMessage |    NoArguments |     MsgPack |    732.9 ns |   8.135 ns |   7.609 ns | 1,364,499.2 | 0.0067 |     224 B |

It's slower and allocates a little bit less. Going to profile it. Small note, we don't get the benefit of the thread static caching in this test because WriteToArray already passes in a MemoryPoolBufferWriter to the MessagePackProtocol.

Looks like Write and WriteByte are just generally slower:

image

Old:

image

New:

image

Code

public class Program
{
    public static void Main(string[] args)
    {
        int messages = 100000;
        Console.WriteLine($"Waiting to serialize {messages} hub messages");
        Console.ReadLine();

        var hubMessage = new InvocationMessage(target: "Target", argumentBindingException: null);
        var hubProtocol = new MessagePackHubProtocol();

        for (int i = 0; i < messages; i++)
        {
            byte[] buffer = hubProtocol.WriteToArray(hubMessage);
        }

        Console.WriteLine("Done");
        Console.ReadLine();
    }
}

After optimizations

             Method |          Input | HubProtocol |        Mean |     Error |    StdDev |        Op/s |  Gen 0 | Allocated |
------------------- |--------------- |------------ |------------:|----------:|----------:|------------:|-------:|----------:|
 WriteSingleMessage |   FewArguments |     MsgPack |  1,423.3 ns |  30.92 ns |  54.16 ns |   702,612.5 | 0.0134 |     472 B |
 WriteSingleMessage | LargeArguments |     MsgPack | 18,500.8 ns | 359.45 ns | 318.64 ns |    54,051.8 | 2.0447 |   61848 B |
 WriteSingleMessage |  ManyArguments |     MsgPack |  2,607.8 ns |  51.76 ns |  53.16 ns |   383,471.4 | 0.0229 |     744 B |
 WriteSingleMessage |    NoArguments |     MsgPack |    582.0 ns |  11.67 ns |  20.13 ns | 1,718,105.7 | 0.0067 |     224 B |

- Fixed a bug with CopyTo and CopyToAsync
@benaadams
Copy link
Copy Markdown
Contributor

Override FlushAsync as the default impl will use Task.StartNew

public virtual Task FlushAsync(CancellationToken cancellationToken)
{
    return Task.Factory.StartNew(state => ((Stream)state).Flush(), this,
        cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
}

public void WriteMessage(HubMessage message, IBufferWriter<byte> output)
{
using (var stream = new LimitArrayPoolWriteStream())
var writer = MemoryBufferWriter.Get();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this write everything to array pool buffers just to get a length for prefix, then copies everything out of the buffers to the output after writing the prefix?

Copy link
Copy Markdown
Member Author

@davidfowl davidfowl Apr 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We discussed ways to improve this but haven't enacted any concrete plan yet. The problem is the length is variable sized https://github.com/aspnet/SignalR/blob/dev/specs/HubProtocol.md#varint

Comment thread src/Common/MemoryBufferWriter.cs Outdated
}
else
{
EnsureCapacity(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to recheck conditions

AddSegment()
_currentSegment[0] = value;
_position = 1;

Comment thread src/Common/MemoryBufferWriter.cs Outdated

public override void WriteByte(byte value)
{
if (_currentSegment != null && _position < _currentSegment.Length)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip bounds checks

var currentSegment = _currentSegment;
var position = _position;
if (currentSegment != null && (uint)position < (uint)currentSegment.Length)
{
    currentSegment[position] = value;
    _position = position + 1;
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually seemed to make things slower... Trying again.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need both the array and index in a local rather than working off a memory location (this._currentSegment) for it to work; otherwise the Jit won't assume the values are the same when you use them to do the indexing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was better for arrays that were memory references at one stage; but then it was made worse 😢 dotnet/coreclr#15756

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it's worse

}

_bytesWritten++;
_position++;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

_position++;

As dealt with in two branches of if

Comment thread src/Common/MemoryBufferWriter.cs Outdated
@@ -161,7 +202,7 @@ public byte[] ToArray()
{
_fullSegments[i].CopyTo(result, totalWritten);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take array to local, save on double List lookup

var segment = _fullSegments[i];
segment.CopyTo(result, totalWritten);
totalWritten += segment.Length;

Comment thread src/Common/MemoryBufferWriter.cs Outdated

public override void Write(byte[] buffer, int offset, int count)
{
if (_currentSegment != null && _position + count < _currentSegment.Length)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid overflow (argument checking aside)

_position < _currentSegment.Length - count

Comment thread src/Common/MemoryBufferWriter.cs Outdated
if (_fullSegments != null)
{
// Copy full segments
for (var i = 0; i < _fullSegments.Count; i++)
Copy link
Copy Markdown
Contributor

@benaadams benaadams Apr 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't array, so take property to register

var count = _fullSegments.Count;
for (var i = 0; i < count; i++)

Otherwise it will pull it from stack memory each loop

Comment thread src/Common/MemoryBufferWriter.cs Outdated

public override void Write(byte[] buffer, int offset, int count)
{
if (_currentSegment != null && _position + count < _currentSegment.Length)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use _position lots so take to local/register so it isn't always reading from stack memory

var position = _position;
if (_currentSegment != null && position < _currentSegment.Length - count)
{
    Buffer.BlockCopy(buffer, offset, _currentSegment, position, count);
    _position = position + count;

#if NETCOREAPP2_1
public override void Write(ReadOnlySpan<byte> span)
{
if (_currentSegment != null && span.TryCopyTo(_currentSegment.AsSpan().Slice(_position)))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsSpan takes arguments to do the slice at the same time.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I know.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then I have to pass the length

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_position is already length?

_currentSegment.AsSpan(0, _position)

Copy link
Copy Markdown
Member

@JamesNK JamesNK Apr 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, _position is where you start.

_currentSegment.AsSpan(_position, _currentSegment.Length - _position)? GetSpan, GetMemory do this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to do that but I can.

namespace Microsoft.AspNetCore.Internal
{
internal sealed class MemoryBufferWriter : IBufferWriter<byte>
internal sealed class MemoryBufferWriter : Stream, IBufferWriter<byte>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name doesn't fit anymore. How about MemoryWriter?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was waiting for you to request a rename.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't the name fit? MemoryStreamWriter? Should it encapsulate everything it's implementing? 😄

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also renaming a shared source file is a bit of a chore so I'd like to avoid it as part of this PR (plus the name is fine).

Copy link
Copy Markdown
Member

@JamesNK JamesNK Apr 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemoryStreamBufferWriter? I don't care that much 🤷‍♂️

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I added pooled in there too (I'm trolling BTW).

@JamesNK
Copy link
Copy Markdown
Member

JamesNK commented Apr 8, 2018

I think BufferExtensions.Write is where the slow down is coming from. If we copied its code into the class and made it use the internal array data structure and _position directly rather than calling interface methods on IBufferWriter<byte> there would be a perf improvement.

@davidfowl
Copy link
Copy Markdown
Member Author

I think BufferExtensions.Write is where the slow down is coming from. If we copied its code into the class and made it use the internal array data structure and _position directly rather than calling interface methods on IBufferWriter there would be a perf improvement.

Agreed, I looked at doing that but the performance different is so minimal now, I didn't bother.

Copy link
Copy Markdown
Member

@JamesNK JamesNK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy for AsSpan and BufferExtensions.Write optimizations to be done in the future.

@davidfowl
Copy link
Copy Markdown
Member Author

@JamesNK the future is now. Coming up after this PR.

@davidfowl davidfowl merged commit 9fd713c into dev Apr 8, 2018
@benaadams
Copy link
Copy Markdown
Contributor

benaadams commented Apr 8, 2018

I think BufferExtensions.Write is where the slow down is coming from.

It should be marked as AgressiveInline; its already split into Write and WriteMultiSegment for this sort of purpose; and then hopefully the Jit would also devirtualize the writer via the inline as they are non-virtual methods (and anyway its a sealed class)

@benaadams
Copy link
Copy Markdown
Contributor

Inlines dotnet/corefx#28928

@benaadams
Copy link
Copy Markdown
Contributor

I think BufferExtensions.Write is where the slow down is coming from.

It should be marked as ...

Perhaps is better as you suggest @JamesNK as it is simplier #1918

@davidfowl davidfowl deleted the davidfowl/memory-buffer-writer-stream branch April 11, 2018 10:47
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants