Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s
case BODY_TAG:
if (body != null) {
// only read last body.
body.release();
body.getReferenceManager().release();
body = null;
}
int size = readRawVarint32(stream);
Expand Down Expand Up @@ -271,7 +271,7 @@ private InputStream asInputStream(BufferAllocator allocator) {
int size = 0;
List<ByteBuf> allBufs = new ArrayList<>();
for (ArrowBuf b : bufs) {
allBufs.add(b);
allBufs.add(b.asNettyBuffer());
size += b.readableBytes();
// [ARROW-4213] These buffers must be aligned to an 8-byte boundary in order to be readable from C++.
if (b.readableBytes() % 8 != 0) {
Expand All @@ -288,7 +288,7 @@ private InputStream asInputStream(BufferAllocator allocator) {
ArrowBuf initialBuf = allocator.buffer(baos.size());
initialBuf.writeBytes(baos.toByteArray());
final CompositeByteBuf bb = new CompositeByteBuf(allocator.getAsByteBufAllocator(), true, bufs.size() + 1,
ImmutableList.<ByteBuf>builder().add(initialBuf).addAll(allBufs).build());
ImmutableList.<ByteBuf>builder().add(initialBuf.asNettyBuffer()).addAll(allBufs).build());
final ByteBufInputStream is = new DrainableByteBufInputStream(bb);
return is;
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ void releaseRecordBatch(ArrowRecordBatch recordBatch) {
List<ArrowBuf> buffers = recordBatch.getBuffers();
recordBatch.close();
for (ArrowBuf buf : buffers) {
buf.release();
buf.getReferenceManager().release();
}
}

Expand Down
Loading