From 7ecda78c683070d72623bd4aef67720ed4e983f8 Mon Sep 17 00:00:00 2001 From: Eric Erhardt Date: Wed, 10 Jul 2019 21:48:49 -0500 Subject: [PATCH 1/2] Ensure 8-byte alignment on each buffer in a RecordBatch. --- csharp/src/Apache.Arrow/BitUtility.cs | 8 +++ .../src/Apache.Arrow/Ipc/ArrowStreamWriter.cs | 15 +++-- .../ArrowStreamWriterTests.cs | 56 +++++++++++++++++++ 3 files changed, 74 insertions(+), 5 deletions(-) diff --git a/csharp/src/Apache.Arrow/BitUtility.cs b/csharp/src/Apache.Arrow/BitUtility.cs index a5da46bb1c9..7d2cfbfd365 100644 --- a/csharp/src/Apache.Arrow/BitUtility.cs +++ b/csharp/src/Apache.Arrow/BitUtility.cs @@ -116,6 +116,14 @@ public static int CountBits(ReadOnlySpan data) public static long RoundUpToMultipleOf64(long n) => RoundUpToMultiplePowerOfTwo(n, 64); + /// + /// Rounds an integer to the nearest multiple of 8. + /// + /// Integer to round. + /// Integer rounded to the nearest multiple of 8. + public static long RoundUpToMultipleOf8(long n) => + RoundUpToMultiplePowerOfTwo(n, 8); + /// /// Rounds an integer up to the nearest multiple of factor, where /// factor must be a power of two. diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs index 84881751126..f0b29fe50ed 100644 --- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs +++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs @@ -45,17 +45,19 @@ internal class ArrowRecordBatchFlatBufferBuilder : IArrowArrayVisitor, IArrowArrayVisitor { - public struct Buffer + public readonly struct Buffer { public readonly ArrowBuffer DataBuffer; public readonly int Offset; public readonly int Length; + public readonly int Padding; - public Buffer(ArrowBuffer buffer, int offset, int length) + public Buffer(ArrowBuffer buffer, int offset, int length, int padding) { DataBuffer = buffer; Offset = offset; Length = length; + Padding = padding; } } @@ -124,9 +126,11 @@ private Buffer CreateBuffer(ArrowBuffer buffer) { var offset = TotalLength; - TotalLength += buffer.Length; + int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length)); + TotalLength += paddedLength; - return new Buffer(buffer, offset, buffer.Length); + int padding = paddedLength - buffer.Length; + return new Buffer(buffer, offset, buffer.Length, padding); } public void Visit(IArrowArray array) @@ -242,7 +246,8 @@ private protected async Task WriteRecordBatchInternalAsync(RecordBatch recordBat continue; await WriteBufferAsync(buffers[i].DataBuffer, cancellationToken).ConfigureAwait(false); - bodyLength += buffers[i].DataBuffer.Length; + await WritePaddingAsync(buffers[i].Padding).ConfigureAwait(false); + bodyLength += buffers[i].DataBuffer.Length + buffers[i].Padding; } // Write padding so the record batch message body length is a multiple of 8 bytes diff --git a/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs b/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs index 83a97f3b751..06be8bd6504 100644 --- a/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs +++ b/csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs @@ -14,6 +14,7 @@ // limitations under the License. using Apache.Arrow.Ipc; +using Apache.Arrow.Types; using System; using System.IO; using System.Linq; @@ -139,5 +140,60 @@ private static async Task TestRoundTripRecordBatch(RecordBatch originalBatch) } } } + + [Fact] + public async Task WriteBatchWithCorrectPadding() + { + byte value1 = 0x04; + byte value2 = 0x14; + var batch = new RecordBatch( + new Schema.Builder() + .Field(f => f.Name("age").DataType(Int32Type.Default)) + .Field(f => f.Name("characterCount").DataType(Int32Type.Default)) + .Build(), + new IArrowArray[] + { + new Int32Array( + new ArrowBuffer(new byte[] { value1, value1, 0x00, 0x00 }), + ArrowBuffer.Empty, + length: 1, + nullCount: 0, + offset: 0), + new Int32Array( + new ArrowBuffer(new byte[] { value2, value2, 0x00, 0x00 }), + ArrowBuffer.Empty, + length: 1, + nullCount: 0, + offset: 0) + }, + length: 1); + + await TestRoundTripRecordBatch(batch); + + using (MemoryStream stream = new MemoryStream()) + { + using (var writer = new ArrowStreamWriter(stream, batch.Schema, leaveOpen: true)) + { + await writer.WriteRecordBatchAsync(batch); + } + + byte[] writtenBytes = stream.ToArray(); + + // ensure that the data buffers at the end are 8-byte aligned + Assert.Equal(value1, writtenBytes[writtenBytes.Length - 16]); + Assert.Equal(value1, writtenBytes[writtenBytes.Length - 15]); + for (int i = 14; i > 8; i--) + { + Assert.Equal(0, writtenBytes[writtenBytes.Length - i]); + } + + Assert.Equal(value2, writtenBytes[writtenBytes.Length - 8]); + Assert.Equal(value2, writtenBytes[writtenBytes.Length - 7]); + for (int i = 6; i > 0; i--) + { + Assert.Equal(0, writtenBytes[writtenBytes.Length - i]); + } + } + } } } From 76807e9381cd6bdedf0b58f297d6ff6776c36f66 Mon Sep 17 00:00:00 2001 From: Eric Erhardt Date: Thu, 11 Jul 2019 11:05:36 -0500 Subject: [PATCH 2/2] PR feedback Recalculate padding instead of allocating more memory. --- .../src/Apache.Arrow/Ipc/ArrowStreamWriter.cs | 29 +++++----- .../ArrowWriterBenchmark.cs | 57 +++++++++++++++++++ csharp/test/Apache.Arrow.Tests/TestData.cs | 50 +++++++++------- 3 files changed, 102 insertions(+), 34 deletions(-) create mode 100644 csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs diff --git a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs index f0b29fe50ed..e1da4489ce4 100644 --- a/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs +++ b/csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs @@ -49,15 +49,11 @@ public readonly struct Buffer { public readonly ArrowBuffer DataBuffer; public readonly int Offset; - public readonly int Length; - public readonly int Padding; - public Buffer(ArrowBuffer buffer, int offset, int length, int padding) + public Buffer(ArrowBuffer buffer, int offset) { DataBuffer = buffer; Offset = offset; - Length = length; - Padding = padding; } } @@ -129,8 +125,7 @@ private Buffer CreateBuffer(ArrowBuffer buffer) int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length)); TotalLength += paddedLength; - int padding = paddedLength - buffer.Length; - return new Buffer(buffer, offset, buffer.Length, padding); + return new Buffer(buffer, offset); } public void Visit(IArrowArray array) @@ -219,7 +214,7 @@ private protected async Task WriteRecordBatchInternalAsync(RecordBatch recordBat for (var i = buffers.Count - 1; i >= 0; i--) { Flatbuf.Buffer.CreateBuffer(Builder, - buffers[i].Offset, buffers[i].Length); + buffers[i].Offset, buffers[i].DataBuffer.Length); } var buffersVectorOffset = Builder.EndVector(); @@ -242,12 +237,20 @@ private protected async Task WriteRecordBatchInternalAsync(RecordBatch recordBat for (var i = 0; i < buffers.Count; i++) { - if (buffers[i].DataBuffer.IsEmpty) + ArrowBuffer buffer = buffers[i].DataBuffer; + if (buffer.IsEmpty) continue; - await WriteBufferAsync(buffers[i].DataBuffer, cancellationToken).ConfigureAwait(false); - await WritePaddingAsync(buffers[i].Padding).ConfigureAwait(false); - bodyLength += buffers[i].DataBuffer.Length + buffers[i].Padding; + await WriteBufferAsync(buffer, cancellationToken).ConfigureAwait(false); + + int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length)); + int padding = paddedLength - buffer.Length; + if (padding > 0) + { + await WritePaddingAsync(padding).ConfigureAwait(false); + } + + bodyLength += paddedLength; } // Write padding so the record batch message body length is a multiple of 8 bytes @@ -337,7 +340,7 @@ private async ValueTask WriteMessageAsync( where T: struct { var messageOffset = Flatbuf.Message.CreateMessage( - Builder, CurrentMetadataVersion, headerType, headerOffset.Value, + Builder, CurrentMetadataVersion, headerType, headerOffset.Value, bodyLength); Builder.Finish(messageOffset.Value); diff --git a/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs b/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs new file mode 100644 index 00000000000..5b19486ebaf --- /dev/null +++ b/csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Apache.Arrow.Ipc; +using Apache.Arrow.Tests; +using BenchmarkDotNet.Attributes; +using System.IO; +using System.Threading.Tasks; + +namespace Apache.Arrow.Benchmarks +{ + //[EtwProfiler] - needs elevated privileges + [MemoryDiagnoser] + public class ArrowWriterBenchmark + { + [Params(10_000, 1_000_000)] + public int BatchLength{ get; set; } + + [Params(10, 25)] + public int ColumnSetCount { get; set; } + + private MemoryStream _memoryStream; + private RecordBatch _batch; + + [GlobalSetup] + public void GlobalSetup() + { + _batch = TestData.CreateSampleRecordBatch(BatchLength, ColumnSetCount); + _memoryStream = new MemoryStream(); + } + + [IterationSetup] + public void Setup() + { + _memoryStream.Position = 0; + } + + [Benchmark] + public async Task WriteBatch() + { + ArrowStreamWriter writer = new ArrowStreamWriter(_memoryStream, _batch.Schema); + await writer.WriteRecordBatchAsync(_batch); + } + } +} diff --git a/csharp/test/Apache.Arrow.Tests/TestData.cs b/csharp/test/Apache.Arrow.Tests/TestData.cs index 1bc046dd74c..15774a75c43 100644 --- a/csharp/test/Apache.Arrow.Tests/TestData.cs +++ b/csharp/test/Apache.Arrow.Tests/TestData.cs @@ -22,27 +22,35 @@ namespace Apache.Arrow.Tests public static class TestData { public static RecordBatch CreateSampleRecordBatch(int length) + { + return CreateSampleRecordBatch(length, columnSetCount: 1); + } + + public static RecordBatch CreateSampleRecordBatch(int length, int columnSetCount) { Schema.Builder builder = new Schema.Builder(); - builder.Field(CreateField(BooleanType.Default)); - builder.Field(CreateField(UInt8Type.Default)); - builder.Field(CreateField(Int8Type.Default)); - builder.Field(CreateField(UInt16Type.Default)); - builder.Field(CreateField(Int16Type.Default)); - builder.Field(CreateField(UInt32Type.Default)); - builder.Field(CreateField(Int32Type.Default)); - builder.Field(CreateField(UInt64Type.Default)); - builder.Field(CreateField(Int64Type.Default)); - builder.Field(CreateField(FloatType.Default)); - builder.Field(CreateField(DoubleType.Default)); - //builder.Field(CreateField(new DecimalType(19, 2))); - //builder.Field(CreateField(HalfFloatType.Default)); - //builder.Field(CreateField(StringType.Default)); - //builder.Field(CreateField(Date32Type.Default)); - //builder.Field(CreateField(Date64Type.Default)); - //builder.Field(CreateField(Time32Type.Default)); - //builder.Field(CreateField(Time64Type.Default)); - //builder.Field(CreateField(TimestampType.Default)); + for (int i = 0; i < columnSetCount; i++) + { + builder.Field(CreateField(BooleanType.Default, i)); + builder.Field(CreateField(UInt8Type.Default, i)); + builder.Field(CreateField(Int8Type.Default, i)); + builder.Field(CreateField(UInt16Type.Default, i)); + builder.Field(CreateField(Int16Type.Default, i)); + builder.Field(CreateField(UInt32Type.Default, i)); + builder.Field(CreateField(Int32Type.Default, i)); + builder.Field(CreateField(UInt64Type.Default, i)); + builder.Field(CreateField(Int64Type.Default, i)); + builder.Field(CreateField(FloatType.Default, i)); + builder.Field(CreateField(DoubleType.Default, i)); + //builder.Field(CreateField(new DecimalType(19, 2))); + //builder.Field(CreateField(HalfFloatType.Default)); + //builder.Field(CreateField(StringType.Default)); + //builder.Field(CreateField(Date32Type.Default)); + //builder.Field(CreateField(Date64Type.Default)); + //builder.Field(CreateField(Time32Type.Default)); + //builder.Field(CreateField(Time64Type.Default)); + //builder.Field(CreateField(TimestampType.Default)); + } Schema schema = builder.Build(); @@ -51,9 +59,9 @@ public static RecordBatch CreateSampleRecordBatch(int length) return new RecordBatch(schema, arrays, length); } - private static Field CreateField(ArrowType type) + private static Field CreateField(ArrowType type, int iteration) { - return new Field(type.Name, type, nullable: false); + return new Field(type.Name + iteration, type, nullable: false); } private static IEnumerable CreateArrays(Schema schema, int length)