Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void Visit(BinaryArray array)

private void CreateBuffers(BooleanArray array)
{
_buffers.Add(CreateBuffer(ArrowBuffer.Empty));
_buffers.Add(CreateBuffer(array.NullBitmapBuffer));
_buffers.Add(CreateBuffer(array.ValueBuffer));
}

Expand Down Expand Up @@ -181,28 +181,26 @@ private protected async Task WriteRecordBatchInternalAsync(RecordBatch recordBat
HasWrittenSchema = true;
}

var recordBatchBuilder = new ArrowRecordBatchFlatBufferBuilder();

Builder.Clear();

// Serialize field nodes

var fieldCount = Schema.Fields.Count;
var fieldNodeOffsets = new Offset<Flatbuf.FieldNode>[fieldCount];

Flatbuf.RecordBatch.StartNodesVector(Builder, fieldCount);

for (var i = 0; i < fieldCount; i++)
// flatbuffer struct vectors have to be created in reverse order
for (var i = fieldCount - 1; i >= 0; i--)
{
var fieldArray = recordBatch.Column(i);
fieldNodeOffsets[i] =
Flatbuf.FieldNode.CreateFieldNode(Builder, fieldArray.Length, fieldArray.NullCount);
Flatbuf.FieldNode.CreateFieldNode(Builder, fieldArray.Length, fieldArray.NullCount);
}

var fieldNodesVectorOffset = Builder.EndVector();

// Serialize buffers

var recordBatchBuilder = new ArrowRecordBatchFlatBufferBuilder();
for (var i = 0; i < fieldCount; i++)
{
var fieldArray = recordBatch.Column(i);
Expand All @@ -213,6 +211,7 @@ private protected async Task WriteRecordBatchInternalAsync(RecordBatch recordBat

Flatbuf.RecordBatch.StartBuffersVector(Builder, buffers.Count);

// flatbuffer struct vectors have to be created in reverse order
for (var i = buffers.Count - 1; i >= 0; i--)
{
Flatbuf.Buffer.CreateBuffer(Builder,
Expand Down
34 changes: 34 additions & 0 deletions csharp/test/Apache.Arrow.Tests/ArrowStreamWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Apache.Arrow.Ipc;
using System;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
Expand Down Expand Up @@ -89,6 +90,39 @@ public async Task WriteEmptyBatch()
{
RecordBatch originalBatch = TestData.CreateSampleRecordBatch(length: 0);

await TestRoundTripRecordBatch(originalBatch);
}

[Fact]
public async Task WriteBatchWithNulls()
{
RecordBatch originalBatch = new RecordBatch.Builder()
.Append("Column1", false, col => col.Int32(array => array.AppendRange(Enumerable.Range(0, 10))))
.Append("Column2", true, new Int32Array(
valueBuffer: new ArrowBuffer.Builder<int>().AppendRange(Enumerable.Range(0, 10)).Build(),
nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0xfd).Append(0xff).Build(),
length: 10,
nullCount: 2,
offset: 0))
.Append("Column3", true, new Int32Array(
valueBuffer: new ArrowBuffer.Builder<int>().AppendRange(Enumerable.Range(0, 10)).Build(),
nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0x00).Append(0x00).Build(),
length: 10,
nullCount: 10,
offset: 0))
.Append("NullableBooleanColumn", true, new BooleanArray(
valueBuffer: new ArrowBuffer.Builder<byte>().Append(0xfd).Append(0xff).Build(),
nullBitmapBuffer: new ArrowBuffer.Builder<byte>().Append(0xed).Append(0xff).Build(),
length: 10,
nullCount: 3,
offset: 0))
.Build();

await TestRoundTripRecordBatch(originalBatch);
}

private static async Task TestRoundTripRecordBatch(RecordBatch originalBatch)
{
using (MemoryStream stream = new MemoryStream())
{
using (var writer = new ArrowStreamWriter(stream, originalBatch.Schema, leaveOpen: true))
Expand Down