Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions csharp/src/Apache.Arrow/BitUtility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ public static int CountBits(ReadOnlySpan<byte> data)
public static long RoundUpToMultipleOf64(long n) =>
RoundUpToMultiplePowerOfTwo(n, 64);

/// <summary>
/// Rounds an integer to the nearest multiple of 8.
/// </summary>
/// <param name="n">Integer to round.</param>
/// <returns>Integer rounded to the nearest multiple of 8.</returns>
public static long RoundUpToMultipleOf8(long n) =>
RoundUpToMultiplePowerOfTwo(n, 8);

/// <summary>
/// Rounds an integer up to the nearest multiple of factor, where
/// factor must be a power of two.
Expand Down
30 changes: 19 additions & 11 deletions csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,15 @@ internal class ArrowRecordBatchFlatBufferBuilder :
IArrowArrayVisitor<StringArray>,
IArrowArrayVisitor<BinaryArray>
{
public struct Buffer
public readonly struct Buffer
{
public readonly ArrowBuffer DataBuffer;
public readonly int Offset;
public readonly int Length;

public Buffer(ArrowBuffer buffer, int offset, int length)
public Buffer(ArrowBuffer buffer, int offset)
{
DataBuffer = buffer;
Offset = offset;
Length = length;
}
}

Expand Down Expand Up @@ -124,9 +122,10 @@ private Buffer CreateBuffer(ArrowBuffer buffer)
{
var offset = TotalLength;

TotalLength += buffer.Length;
int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length));
TotalLength += paddedLength;

return new Buffer(buffer, offset, buffer.Length);
return new Buffer(buffer, offset);
}

public void Visit(IArrowArray array)
Expand Down Expand Up @@ -215,7 +214,7 @@ private protected async Task WriteRecordBatchInternalAsync(RecordBatch recordBat
for (var i = buffers.Count - 1; i >= 0; i--)
{
Flatbuf.Buffer.CreateBuffer(Builder,
buffers[i].Offset, buffers[i].Length);
buffers[i].Offset, buffers[i].DataBuffer.Length);
}

var buffersVectorOffset = Builder.EndVector();
Expand All @@ -238,11 +237,20 @@ private protected async Task WriteRecordBatchInternalAsync(RecordBatch recordBat

for (var i = 0; i < buffers.Count; i++)
{
if (buffers[i].DataBuffer.IsEmpty)
ArrowBuffer buffer = buffers[i].DataBuffer;
if (buffer.IsEmpty)
continue;

await WriteBufferAsync(buffers[i].DataBuffer, cancellationToken).ConfigureAwait(false);
bodyLength += buffers[i].DataBuffer.Length;
await WriteBufferAsync(buffer, cancellationToken).ConfigureAwait(false);

int paddedLength = checked((int)BitUtility.RoundUpToMultipleOf8(buffer.Length));
int padding = paddedLength - buffer.Length;
if (padding > 0)
{
await WritePaddingAsync(padding).ConfigureAwait(false);
}

bodyLength += paddedLength;
}

// Write padding so the record batch message body length is a multiple of 8 bytes
Expand Down Expand Up @@ -332,7 +340,7 @@ private async ValueTask<long> WriteMessageAsync<T>(
where T: struct
{
var messageOffset = Flatbuf.Message.CreateMessage(
Builder, CurrentMetadataVersion, headerType, headerOffset.Value,
Builder, CurrentMetadataVersion, headerType, headerOffset.Value,
bodyLength);

Builder.Finish(messageOffset.Value);
Expand Down
57 changes: 57 additions & 0 deletions csharp/test/Apache.Arrow.Benchmarks/ArrowWriterBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Apache.Arrow.Ipc;
using Apache.Arrow.Tests;
using BenchmarkDotNet.Attributes;
using System.IO;
using System.Threading.Tasks;

namespace Apache.Arrow.Benchmarks
{
//[EtwProfiler] - needs elevated privileges
[MemoryDiagnoser]
public class ArrowWriterBenchmark
{
[Params(10_000, 1_000_000)]
public int BatchLength{ get; set; }

[Params(10, 25)]
public int ColumnSetCount { get; set; }

private MemoryStream _memoryStream;
private RecordBatch _batch;

[GlobalSetup]
public void GlobalSetup()
{
_batch = TestData.CreateSampleRecordBatch(BatchLength, ColumnSetCount);
_memoryStream = new MemoryStream();
}

[IterationSetup]
public void Setup()
{
_memoryStream.Position = 0;
}

[Benchmark]
public async Task WriteBatch()
{
ArrowStreamWriter writer = new ArrowStreamWriter(_memoryStream, _batch.Schema);
await writer.WriteRecordBatchAsync(_batch);
}
}
}
56 changes: 56 additions & 0 deletions csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

using Apache.Arrow.Ipc;
using Apache.Arrow.Types;
using System;
using System.IO;
using System.Linq;
Expand Down Expand Up @@ -139,5 +140,60 @@ private static async Task TestRoundTripRecordBatch(RecordBatch originalBatch)
}
}
}

[Fact]
public async Task WriteBatchWithCorrectPadding()
{
byte value1 = 0x04;
byte value2 = 0x14;
var batch = new RecordBatch(
new Schema.Builder()
.Field(f => f.Name("age").DataType(Int32Type.Default))
.Field(f => f.Name("characterCount").DataType(Int32Type.Default))
.Build(),
new IArrowArray[]
{
new Int32Array(
new ArrowBuffer(new byte[] { value1, value1, 0x00, 0x00 }),
ArrowBuffer.Empty,
length: 1,
nullCount: 0,
offset: 0),
new Int32Array(
new ArrowBuffer(new byte[] { value2, value2, 0x00, 0x00 }),
ArrowBuffer.Empty,
length: 1,
nullCount: 0,
offset: 0)
},
length: 1);

await TestRoundTripRecordBatch(batch);

using (MemoryStream stream = new MemoryStream())
{
using (var writer = new ArrowStreamWriter(stream, batch.Schema, leaveOpen: true))
{
await writer.WriteRecordBatchAsync(batch);
}

byte[] writtenBytes = stream.ToArray();

// ensure that the data buffers at the end are 8-byte aligned
Assert.Equal(value1, writtenBytes[writtenBytes.Length - 16]);
Assert.Equal(value1, writtenBytes[writtenBytes.Length - 15]);
for (int i = 14; i > 8; i--)
{
Assert.Equal(0, writtenBytes[writtenBytes.Length - i]);
}

Assert.Equal(value2, writtenBytes[writtenBytes.Length - 8]);
Assert.Equal(value2, writtenBytes[writtenBytes.Length - 7]);
for (int i = 6; i > 0; i--)
{
Assert.Equal(0, writtenBytes[writtenBytes.Length - i]);
}
}
}
}
}
50 changes: 29 additions & 21 deletions csharp/test/Apache.Arrow.Tests/TestData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,35 @@ namespace Apache.Arrow.Tests
public static class TestData
{
public static RecordBatch CreateSampleRecordBatch(int length)
{
return CreateSampleRecordBatch(length, columnSetCount: 1);
}

public static RecordBatch CreateSampleRecordBatch(int length, int columnSetCount)
{
Schema.Builder builder = new Schema.Builder();
builder.Field(CreateField(BooleanType.Default));
builder.Field(CreateField(UInt8Type.Default));
builder.Field(CreateField(Int8Type.Default));
builder.Field(CreateField(UInt16Type.Default));
builder.Field(CreateField(Int16Type.Default));
builder.Field(CreateField(UInt32Type.Default));
builder.Field(CreateField(Int32Type.Default));
builder.Field(CreateField(UInt64Type.Default));
builder.Field(CreateField(Int64Type.Default));
builder.Field(CreateField(FloatType.Default));
builder.Field(CreateField(DoubleType.Default));
//builder.Field(CreateField(new DecimalType(19, 2)));
//builder.Field(CreateField(HalfFloatType.Default));
//builder.Field(CreateField(StringType.Default));
//builder.Field(CreateField(Date32Type.Default));
//builder.Field(CreateField(Date64Type.Default));
//builder.Field(CreateField(Time32Type.Default));
//builder.Field(CreateField(Time64Type.Default));
//builder.Field(CreateField(TimestampType.Default));
for (int i = 0; i < columnSetCount; i++)
{
builder.Field(CreateField(BooleanType.Default, i));
builder.Field(CreateField(UInt8Type.Default, i));
builder.Field(CreateField(Int8Type.Default, i));
builder.Field(CreateField(UInt16Type.Default, i));
builder.Field(CreateField(Int16Type.Default, i));
builder.Field(CreateField(UInt32Type.Default, i));
builder.Field(CreateField(Int32Type.Default, i));
builder.Field(CreateField(UInt64Type.Default, i));
builder.Field(CreateField(Int64Type.Default, i));
builder.Field(CreateField(FloatType.Default, i));
builder.Field(CreateField(DoubleType.Default, i));
//builder.Field(CreateField(new DecimalType(19, 2)));
//builder.Field(CreateField(HalfFloatType.Default));
//builder.Field(CreateField(StringType.Default));
//builder.Field(CreateField(Date32Type.Default));
//builder.Field(CreateField(Date64Type.Default));
//builder.Field(CreateField(Time32Type.Default));
//builder.Field(CreateField(Time64Type.Default));
//builder.Field(CreateField(TimestampType.Default));
}

Schema schema = builder.Build();

Expand All @@ -51,9 +59,9 @@ public static RecordBatch CreateSampleRecordBatch(int length)
return new RecordBatch(schema, arrays, length);
}

private static Field CreateField(ArrowType type)
private static Field CreateField(ArrowType type, int iteration)
{
return new Field(type.Name, type, nullable: false);
return new Field(type.Name + iteration, type, nullable: false);
}

private static IEnumerable<IArrowArray> CreateArrays(Schema schema, int length)
Expand Down