Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void consume(InputStream is) throws IOException {
ArrowBuf offsetBuffer = vector.getOffsetBuffer();
int startIndex = offsetBuffer.getInt(currentIndex * 4);
while ((read = is.read(bytes)) != -1) {
while ((dataBuffer.writerIndex() + read) > dataBuffer.capacity()) {
while ((startIndex + totalBytes + read) > dataBuffer.capacity()) {
vector.reallocDataBuffer();
}
PlatformDependent.copyMemory(bytes, 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void consume(ResultSet resultSet) throws SQLException {
String str = clob.getSubString(read, readSize);
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);

while ((dataBuffer.writerIndex() + bytes.length) > dataBuffer.capacity()) {
while ((startIndex + totalBytes + bytes.length) > dataBuffer.capacity()) {
vector.reallocDataBuffer();
}
PlatformDependent.copyMemory(bytes, 0,
Expand Down Expand Up @@ -141,7 +141,7 @@ public void consume(ResultSet resultSet) throws SQLException {
String str = clob.getSubString(read, readSize);
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);

while ((dataBuffer.writerIndex() + bytes.length) > dataBuffer.capacity()) {
while ((startIndex + totalBytes + bytes.length) > dataBuffer.capacity()) {
vector.reallocDataBuffer();
}
PlatformDependent.copyMemory(bytes, 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ private InputStream asInputStream(BufferAllocator allocator) {

if (appMetadata != null && appMetadata.capacity() > 0) {
// Must call slice() as CodedOutputStream#writeByteBuffer writes -capacity- bytes, not -limit- bytes
cos.writeByteBuffer(FlightData.APP_METADATA_FIELD_NUMBER, appMetadata.asNettyBuffer().nioBuffer().slice());
cos.writeByteBuffer(FlightData.APP_METADATA_FIELD_NUMBER,
appMetadata.asNettyBuffer().nioBuffer(0, appMetadata.capacity()).slice());
// This is weird, but implicitly, writing an ArrowMessage frees any references it has
appMetadata.getReferenceManager().release();
}
Expand All @@ -305,11 +306,11 @@ private InputStream asInputStream(BufferAllocator allocator) {
int size = 0;
List<ByteBuf> allBufs = new ArrayList<>();
for (ArrowBuf b : bufs) {
allBufs.add(b.asNettyBuffer());
size += b.readableBytes();
allBufs.add(b.asNettyBuffer().writerIndex(b.capacity()));
size += b.capacity();
// [ARROW-4213] These buffers must be aligned to an 8-byte boundary in order to be readable from C++.
if (b.readableBytes() % 8 != 0) {
int paddingBytes = 8 - (b.readableBytes() % 8);
if (b.capacity() % 8 != 0) {
int paddingBytes = 8 - (b.capacity() % 8);
assert paddingBytes > 0 && paddingBytes < 8;
size += paddingBytes;
allBufs.add(PADDING_BUFFERS.get(paddingBytes).retain());
Expand All @@ -320,9 +321,11 @@ private InputStream asInputStream(BufferAllocator allocator) {
cos.flush();

ArrowBuf initialBuf = allocator.buffer(baos.size());
initialBuf.writeBytes(baos.toByteArray());
final CompositeByteBuf bb = new CompositeByteBuf(allocator.getAsByteBufAllocator(), true, bufs.size() + 1,
ImmutableList.<ByteBuf>builder().add(initialBuf.asNettyBuffer()).addAll(allBufs).build());
initialBuf.setBytes(0, baos.toByteArray());
final CompositeByteBuf bb = new CompositeByteBuf(allocator.getAsByteBufAllocator(), true,
bufs.size() + 1, ImmutableList.<ByteBuf>builder()
.add(initialBuf.asNettyBuffer().writerIndex(baos.size()))
.addAll(allBufs).build());
// Implicitly, transfer ownership of our buffers to the input stream (which will decrement the refcount when done)
final ByteBufInputStream is = new DrainableByteBufInputStream(bb);
return is;
Expand Down
14 changes: 9 additions & 5 deletions java/flight/src/main/java/org/apache/arrow/flight/PutResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.arrow.flight;

import java.nio.ByteBuffer;

import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReferenceManager;
Expand Down Expand Up @@ -81,11 +83,13 @@ Flight.PutResult toProtocol() {
*/
static PutResult fromProtocol(BufferAllocator allocator, Flight.PutResult message) {
final ArrowBuf buf = allocator.buffer(message.getAppMetadata().size());
message.getAppMetadata().asReadOnlyByteBufferList().forEach(bb -> {
buf.setBytes(buf.writerIndex(), bb);
buf.writerIndex(buf.writerIndex() + bb.limit());
});
return new PutResult(buf);

int writerIndex = 0;
for (ByteBuffer bb : message.getAppMetadata().asReadOnlyByteBufferList()) {
buf.setBytes(writerIndex, bb);
writerIndex += bb.limit();
}
return new PutResult(buf.slice(0, writerIndex));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void sendTo(BufferAllocator allocator, ServerStreamListener listener) {
for (ArrowRecordBatch batch : batches) {
final byte[] rawMetadata = Integer.toString(counter).getBytes(StandardCharsets.UTF_8);
final ArrowBuf metadata = allocator.buffer(rawMetadata.length);
metadata.writeBytes(rawMetadata);
metadata.setBytes(0, rawMetadata);
loader.load(batch);
// Transfers ownership of the buffer - do not free buffer ourselves
listener.putNext(metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ private static void testStream(BufferAllocator allocator, Location server, Fligh

@Override
public void onNext(PutResult val) {
final byte[] metadataRaw = new byte[val.getApplicationMetadata().readableBytes()];
val.getApplicationMetadata().readBytes(metadataRaw);
final byte[] metadataRaw = new byte[val.getApplicationMetadata().capacity()];
val.getApplicationMetadata().getBytes(0, metadataRaw);
final String metadata = new String(metadataRaw, StandardCharsets.UTF_8);
if (!Integer.toString(counter).equals(metadata)) {
throw new RuntimeException(
Expand All @@ -123,10 +123,12 @@ public void onNext(PutResult val) {
}
});
int counter = 0;
int writerIndex = 0;
while (reader.read(root)) {
final byte[] rawMetadata = Integer.toString(counter).getBytes(StandardCharsets.UTF_8);
final ArrowBuf metadata = allocator.buffer(rawMetadata.length);
metadata.writeBytes(rawMetadata);
metadata.setBytes(writerIndex, rawMetadata);
writerIndex += rawMetadata.length;
// Transfers ownership of the buffer, so do not release it ourselves
stream.putNext(metadata);
try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public static void readIntoBuffer(final InputStream stream, final ArrowBuf buf,
} else {
byte[] heapBytes = new byte[size];
ByteStreams.readFully(stream, heapBytes);
buf.writeBytes(heapBytes);
buf.setBytes(0, heapBytes);
}
buf.writerIndex(size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void onNext(PutResult val) {
for (byte i = 0; i < 10; i++) {
final IntVector vector = (IntVector) root.getVector("a");
final ArrowBuf metadata = allocator.buffer(1);
metadata.writeByte(i);
metadata.setByte(i, i);
vector.set(0, 10);
vector.setValueCount(1);
root.setRowCount(1);
Expand Down Expand Up @@ -150,7 +150,7 @@ public void uploadMetadataSync() {
for (byte i = 0; i < 10; i++) {
final IntVector vector = (IntVector) root.getVector("a");
final ArrowBuf metadata = allocator.buffer(1);
metadata.writeByte(i);
metadata.setByte(i, i);
vector.set(0, 10);
vector.setValueCount(1);
root.setRowCount(1);
Expand Down Expand Up @@ -186,7 +186,7 @@ public void syncMemoryReclaimed() {
for (byte i = 0; i < 10; i++) {
final IntVector vector = (IntVector) root.getVector("a");
final ArrowBuf metadata = allocator.buffer(1);
metadata.writeByte(i);
metadata.setByte(i, i);
vector.set(0, 10);
vector.setValueCount(1);
root.setRowCount(1);
Expand Down Expand Up @@ -234,7 +234,7 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l
vector.setValueCount(1);
root.setRowCount(1);
final ArrowBuf metadata = allocator.buffer(1);
metadata.writeByte(i);
metadata.setByte(i, i);
listener.putNext(metadata);
}
listener.completed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void evaluate(int numRows, List<ArrowBuf> buffers,
List<ArrowBuffer> buffersLayout = new ArrayList<>();
long offset = 0;
for (ArrowBuf arrowBuf : buffers) {
long size = arrowBuf.readableBytes();
long size = arrowBuf.capacity();
buffersLayout.add(new ArrowBuffer(offset, size));
offset += size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void evaluate(int numRows, List<ArrowBuf> buffers,
List<ArrowBuffer> buffersLayout = new ArrayList<>();
long offset = 0;
for (ArrowBuf arrowBuf : buffers) {
long size = arrowBuf.readableBytes();
long size = arrowBuf.capacity();
buffersLayout.add(new ArrowBuffer(offset, size));
offset += size;
}
Expand Down Expand Up @@ -199,7 +199,7 @@ public void evaluate(int numRows, List<ArrowBuf> buffers,
List<ArrowBuffer> buffersLayout = new ArrayList<>();
long offset = 0;
for (ArrowBuf arrowBuf : buffers) {
long size = arrowBuf.readableBytes();
long size = arrowBuf.capacity();
buffersLayout.add(new ArrowBuffer(offset, size));
offset += size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class Int32DataAndVectorGenerator implements DataAndVectorGenerator {

protected final BufferAllocator allocator;
protected final Random rand;
protected int writerIndex;

Int32DataAndVectorGenerator(BufferAllocator allocator) {
this.allocator = allocator;
Expand All @@ -150,7 +151,8 @@ class Int32DataAndVectorGenerator implements DataAndVectorGenerator {

@Override
public void writeData(ArrowBuf buffer) {
buffer.writeInt(rand.nextInt());
buffer.setInt(writerIndex, rand.nextInt());
writerIndex += 4;
}

@Override
Expand All @@ -172,7 +174,8 @@ class BoundedInt32DataAndVectorGenerator extends Int32DataAndVectorGenerator {

@Override
public void writeData(ArrowBuf buffer) {
buffer.writeInt(rand.nextInt(upperBound));
buffer.setInt(writerIndex, rand.nextInt(upperBound));
writerIndex += 4;
}
}

Expand Down Expand Up @@ -205,31 +208,34 @@ public void tearDown() {

ArrowBuf buf(int length) {
ArrowBuf buffer = allocator.buffer(length);
return buffer;
ArrowBuf slice = buffer.slice(0, length);
return slice;
}

ArrowBuf buf(byte[] bytes) {
ArrowBuf buffer = allocator.buffer(bytes.length);
buffer.writeBytes(bytes);
return buffer;
buffer.setBytes(0, bytes);
ArrowBuf slice = buffer.slice(0, bytes.length);
return slice;
}

ArrowBuf arrowBufWithAllValid(int size) {
int bufLen = (size + 7) / 8;
ArrowBuf buffer = allocator.buffer(bufLen);
for (int i = 0; i < bufLen; i++) {
buffer.writeByte(255);
buffer.setByte(i, 255);
}

return buffer;
ArrowBuf slice = buffer.slice(0, bufLen);
return slice;
}

ArrowBuf intBuf(int[] ints) {
ArrowBuf buffer = allocator.buffer(ints.length * 4);
for (int i = 0; i < ints.length; i++) {
buffer.writeInt(ints[i]);
buffer.setInt(i, ints[i]);
}
return buffer;
ArrowBuf slice = buffer.slice(0, ints.length * 4);
return slice;
}

DecimalVector decimalVector(String[] values, int precision, int scale) {
Expand Down Expand Up @@ -258,25 +264,26 @@ VarCharVector varcharVector(String[] values) {
ArrowBuf longBuf(long[] longs) {
ArrowBuf buffer = allocator.buffer(longs.length * 8);
for (int i = 0; i < longs.length; i++) {
buffer.writeLong(longs[i]);
buffer.setLong(i * 8, longs[i]);
}
return buffer;
ArrowBuf slice = buffer.slice(0, longs.length * 8);
return slice;
}

ArrowBuf doubleBuf(double[] data) {
ArrowBuf buffer = allocator.buffer(data.length * 8);
for (int i = 0; i < data.length; i++) {
buffer.writeDouble(data[i]);
buffer.setDouble(i * 8, data[i]);
}

return buffer;
ArrowBuf slice = buffer.slice(0, data.length * 8);
return slice;
}

ArrowBuf stringToMillis(String[] dates) {
ArrowBuf buffer = allocator.buffer(dates.length * 8);
for (int i = 0; i < dates.length; i++) {
Instant instant = Instant.parse(dates[i]);
buffer.writeLong(instant.toEpochMilli());
buffer.setLong(i * 8, instant.toEpochMilli());
}

return buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,17 @@ List<ArrowBuf> varBufs(String[] strings, Charset charset) {

int startOffset = 0;
for (int i = 0; i < strings.length; i++) {
offsetsBuffer.writeInt(startOffset);
offsetsBuffer.setInt(i * 4, startOffset);

final byte[] bytes = strings[i].getBytes(charset);
dataBuffer = dataBuffer.reallocIfNeeded(dataBuffer.writerIndex() + bytes.length);
dataBuffer = dataBuffer.reallocIfNeeded(startOffset + bytes.length);
dataBuffer.setBytes(startOffset, bytes, 0, bytes.length);
startOffset += bytes.length;
}
offsetsBuffer.writeInt(startOffset); // offset for the last element
offsetsBuffer.setInt(strings.length * 4, startOffset); // offset for the last element

return Arrays.asList(offsetsBuffer, dataBuffer);
return Arrays.asList(offsetsBuffer.slice(0, (strings.length + 1) * 4),
dataBuffer.slice(0, startOffset));
}

List<ArrowBuf> stringBufs(String[] strings) {
Expand Down
Loading