Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 144 additions & 37 deletions src/Common/MemoryBufferWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.AspNetCore.Internal
{
internal sealed class MemoryBufferWriter : IBufferWriter<byte>
internal sealed class MemoryBufferWriter : Stream, IBufferWriter<byte>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name doesn't fit anymore. How about MemoryWriter?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was waiting for you to request a rename.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't the name fit? MemoryStreamWriter? Should it encapsulate everything it's implementing? 😄

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also renaming a shared source file is a bit of a chore so I'd like to avoid it as part of this PR (plus the name is fine).

Copy link
Copy Markdown
Member

@JamesNK JamesNK Apr 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MemoryStreamBufferWriter? I don't care that much 🤷‍♂️

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I added pooled in there too (I'm trolling BTW).

{
[ThreadStatic]
private static MemoryBufferWriter _cachedInstance;
Expand All @@ -18,19 +20,27 @@ internal sealed class MemoryBufferWriter : IBufferWriter<byte>
private bool _inUse;
#endif

private readonly int _segmentSize;
private readonly int _minimumSegmentSize;
private int _bytesWritten;

private List<byte[]> _fullSegments;
private byte[] _currentSegment;
private int _position;

public MemoryBufferWriter(int segmentSize = 2048)
public MemoryBufferWriter(int minimumSegmentSize = 4096)
{
_segmentSize = segmentSize;
_minimumSegmentSize = minimumSegmentSize;
}

public int Length => _bytesWritten;
public override long Length => _bytesWritten;
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}

public static MemoryBufferWriter Get()
{
Expand All @@ -39,9 +49,11 @@ public static MemoryBufferWriter Get()
{
writer = new MemoryBufferWriter();
}

// Taken off the thread static
_cachedInstance = null;
else
{
// Taken off the thread static
_cachedInstance = null;
}
#if DEBUG
if (writer._inUse)
{
Expand Down Expand Up @@ -93,54 +105,87 @@ public void Advance(int count)

public Memory<byte> GetMemory(int sizeHint = 0)
{
// TODO: Use sizeHint
if (_currentSegment == null)
{
_currentSegment = ArrayPool<byte>.Shared.Rent(_segmentSize);
_position = 0;
}
else if (_position == _segmentSize)
{
if (_fullSegments == null)
{
_fullSegments = new List<byte[]>();
}
_fullSegments.Add(_currentSegment);
_currentSegment = ArrayPool<byte>.Shared.Rent(_segmentSize);
_position = 0;
}
EnsureCapacity(sizeHint);

return _currentSegment.AsMemory(_position, _currentSegment.Length - _position);
}

public Span<byte> GetSpan(int sizeHint = 0)
{
return GetMemory(sizeHint).Span;
EnsureCapacity(sizeHint);

return _currentSegment.AsSpan(_position, _currentSegment.Length - _position);
}

public Task CopyToAsync(Stream stream)
public void CopyTo(IBufferWriter<byte> destination)
{
if (_fullSegments != null)
{
// Copy full segments
var count = _fullSegments.Count;
for (var i = 0; i < count; i++)
{
destination.Write(_fullSegments[i]);
}
}

destination.Write(_currentSegment.AsSpan(0, _position));
}

public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
if (_fullSegments == null)
{
// There is only one segment so write without async
return stream.WriteAsync(_currentSegment, 0, _position);
return destination.WriteAsync(_currentSegment, 0, _position);
}

return CopyToSlowAsync(destination);
}

private void EnsureCapacity(int sizeHint)
{
// TODO: Use sizeHint
if (_currentSegment != null && _position < _currentSegment.Length)
{
// We have capacity in the current segment
return;
}

return CopyToSlowAsync(stream);
AddSegment();
}

private async Task CopyToSlowAsync(Stream stream)
private void AddSegment()
{
if (_currentSegment != null)
{
// We're adding a segment to the list
if (_fullSegments == null)
{
_fullSegments = new List<byte[]>();
}

_fullSegments.Add(_currentSegment);
}

_currentSegment = ArrayPool<byte>.Shared.Rent(_minimumSegmentSize);
_position = 0;
}

private async Task CopyToSlowAsync(Stream destination)
{
if (_fullSegments != null)
{
// Copy full segments
for (var i = 0; i < _fullSegments.Count - 1; i++)
// Copy full segments
var count = _fullSegments.Count;
for (var i = 0; i < count; i++)
{
await stream.WriteAsync(_fullSegments[i], 0, _segmentSize);
var segment = _fullSegments[i];
await destination.WriteAsync(segment, 0, segment.Length);
}
}

await stream.WriteAsync(_currentSegment, 0, _position);
await destination.WriteAsync(_currentSegment, 0, _position);
}

public byte[] ToArray()
Expand All @@ -157,11 +202,12 @@ public byte[] ToArray()
if (_fullSegments != null)
{
// Copy full segments
for (var i = 0; i < _fullSegments.Count; i++)
var count = _fullSegments.Count;
for (var i = 0; i < count; i++)
{
_fullSegments[i].CopyTo(result, totalWritten);

totalWritten += _segmentSize;
var segment = _fullSegments[i];
segment.CopyTo(result, totalWritten);
totalWritten += segment.Length;
}
}

Expand All @@ -170,5 +216,66 @@ public byte[] ToArray()

return result;
}

public override void Flush() { }
public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();

public override void WriteByte(byte value)
{
if (_currentSegment != null && (uint)_position < (uint)_currentSegment.Length)
{
_currentSegment[_position] = value;
}
else
{
AddSegment();
_currentSegment[0] = value;
}

_position++;
_bytesWritten++;
}

public override void Write(byte[] buffer, int offset, int count)
{
var position = _position;
if (_currentSegment != null && position < _currentSegment.Length - count)
{
Buffer.BlockCopy(buffer, offset, _currentSegment, position, count);

_position = position + count;
_bytesWritten += count;
}
else
{
BuffersExtensions.Write(this, buffer.AsSpan(offset, count));
}
}

#if NETCOREAPP2_1
public override void Write(ReadOnlySpan<byte> span)
{
if (_currentSegment != null && span.TryCopyTo(_currentSegment.AsSpan().Slice(_position)))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsSpan takes arguments to do the slice at the same time.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya I know.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then I have to pass the length

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_position is already length?

_currentSegment.AsSpan(0, _position)

Copy link
Copy Markdown
Member

@JamesNK JamesNK Apr 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, _position is where you start.

_currentSegment.AsSpan(_position, _currentSegment.Length - _position)? GetSpan, GetMemory do this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to do that but I can.

{
_position += span.Length;
_bytesWritten += span.Length;
}
else
{
BuffersExtensions.Write(this, span);
}
}
#endif

protected override void Dispose(bool disposing)
{
if (disposing)
{
Reset();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ public static class TextMessageFormatter
// will not occur (is not a valid character) and therefore it is safe to not escape it
public static readonly byte RecordSeparator = 0x1e;

public static void WriteRecordSeparator(Stream output)
{
output.WriteByte(RecordSeparator);
}

public static void WriteRecordSeparator(IBufferWriter<byte> output)
{
var buffer = output.GetSpan(1);
Expand Down
Loading