diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java index 2eb5fc0d607..b9ac099f419 100644 --- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java +++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java @@ -71,7 +71,7 @@ public void consume(InputStream is) throws IOException { ArrowBuf offsetBuffer = vector.getOffsetBuffer(); int startIndex = offsetBuffer.getInt(currentIndex * 4); while ((read = is.read(bytes)) != -1) { - while ((dataBuffer.writerIndex() + read) > dataBuffer.capacity()) { + while ((startIndex + totalBytes + read) > dataBuffer.capacity()) { vector.reallocDataBuffer(); } PlatformDependent.copyMemory(bytes, 0, diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ClobConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ClobConsumer.java index ff90e3c3e70..355c6397b8f 100644 --- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ClobConsumer.java +++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ClobConsumer.java @@ -94,7 +94,7 @@ public void consume(ResultSet resultSet) throws SQLException { String str = clob.getSubString(read, readSize); byte[] bytes = str.getBytes(StandardCharsets.UTF_8); - while ((dataBuffer.writerIndex() + bytes.length) > dataBuffer.capacity()) { + while ((startIndex + totalBytes + bytes.length) > dataBuffer.capacity()) { vector.reallocDataBuffer(); } PlatformDependent.copyMemory(bytes, 0, @@ -141,7 +141,7 @@ public void consume(ResultSet resultSet) throws SQLException { String str = clob.getSubString(read, readSize); byte[] bytes = str.getBytes(StandardCharsets.UTF_8); - while ((dataBuffer.writerIndex() + bytes.length) > dataBuffer.capacity()) { + while ((startIndex + totalBytes + bytes.length) > dataBuffer.capacity()) { vector.reallocDataBuffer(); } PlatformDependent.copyMemory(bytes, 0, diff --git a/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java index 26a1bfa0de7..4272d54c1e6 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java @@ -296,7 +296,8 @@ private InputStream asInputStream(BufferAllocator allocator) { if (appMetadata != null && appMetadata.capacity() > 0) { // Must call slice() as CodedOutputStream#writeByteBuffer writes -capacity- bytes, not -limit- bytes - cos.writeByteBuffer(FlightData.APP_METADATA_FIELD_NUMBER, appMetadata.asNettyBuffer().nioBuffer().slice()); + cos.writeByteBuffer(FlightData.APP_METADATA_FIELD_NUMBER, + appMetadata.asNettyBuffer().nioBuffer(0, appMetadata.capacity()).slice()); // This is weird, but implicitly, writing an ArrowMessage frees any references it has appMetadata.getReferenceManager().release(); } @@ -305,11 +306,11 @@ private InputStream asInputStream(BufferAllocator allocator) { int size = 0; List allBufs = new ArrayList<>(); for (ArrowBuf b : bufs) { - allBufs.add(b.asNettyBuffer()); - size += b.readableBytes(); + allBufs.add(b.asNettyBuffer().writerIndex(b.capacity())); + size += b.capacity(); // [ARROW-4213] These buffers must be aligned to an 8-byte boundary in order to be readable from C++. - if (b.readableBytes() % 8 != 0) { - int paddingBytes = 8 - (b.readableBytes() % 8); + if (b.capacity() % 8 != 0) { + int paddingBytes = 8 - (b.capacity() % 8); assert paddingBytes > 0 && paddingBytes < 8; size += paddingBytes; allBufs.add(PADDING_BUFFERS.get(paddingBytes).retain()); @@ -320,9 +321,11 @@ private InputStream asInputStream(BufferAllocator allocator) { cos.flush(); ArrowBuf initialBuf = allocator.buffer(baos.size()); - initialBuf.writeBytes(baos.toByteArray()); - final CompositeByteBuf bb = new CompositeByteBuf(allocator.getAsByteBufAllocator(), true, bufs.size() + 1, - ImmutableList.builder().add(initialBuf.asNettyBuffer()).addAll(allBufs).build()); + initialBuf.setBytes(0, baos.toByteArray()); + final CompositeByteBuf bb = new CompositeByteBuf(allocator.getAsByteBufAllocator(), true, + bufs.size() + 1, ImmutableList.builder() + .add(initialBuf.asNettyBuffer().writerIndex(baos.size())) + .addAll(allBufs).build()); // Implicitly, transfer ownership of our buffers to the input stream (which will decrement the refcount when done) final ByteBufInputStream is = new DrainableByteBufInputStream(bb); return is; diff --git a/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java b/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java index 11848690d07..2b4029252ca 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/PutResult.java @@ -17,6 +17,8 @@ package org.apache.arrow.flight; +import java.nio.ByteBuffer; + import org.apache.arrow.flight.impl.Flight; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.ReferenceManager; @@ -81,11 +83,13 @@ Flight.PutResult toProtocol() { */ static PutResult fromProtocol(BufferAllocator allocator, Flight.PutResult message) { final ArrowBuf buf = allocator.buffer(message.getAppMetadata().size()); - message.getAppMetadata().asReadOnlyByteBufferList().forEach(bb -> { - buf.setBytes(buf.writerIndex(), bb); - buf.writerIndex(buf.writerIndex() + bb.limit()); - }); - return new PutResult(buf); + + int writerIndex = 0; + for (ByteBuffer bb : message.getAppMetadata().asReadOnlyByteBufferList()) { + buf.setBytes(writerIndex, bb); + writerIndex += bb.limit(); + } + return new PutResult(buf.slice(0, writerIndex)); } @Override diff --git a/java/flight/src/main/java/org/apache/arrow/flight/example/Stream.java b/java/flight/src/main/java/org/apache/arrow/flight/example/Stream.java index 35f191570c6..e4cbc49e19e 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/example/Stream.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/example/Stream.java @@ -95,7 +95,7 @@ public void sendTo(BufferAllocator allocator, ServerStreamListener listener) { for (ArrowRecordBatch batch : batches) { final byte[] rawMetadata = Integer.toString(counter).getBytes(StandardCharsets.UTF_8); final ArrowBuf metadata = allocator.buffer(rawMetadata.length); - metadata.writeBytes(rawMetadata); + metadata.setBytes(0, rawMetadata); loader.load(batch); // Transfers ownership of the buffer - do not free buffer ourselves listener.putNext(metadata); diff --git a/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java b/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java index 3ee554c34e9..bed2ea83e25 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java @@ -112,8 +112,8 @@ private static void testStream(BufferAllocator allocator, Location server, Fligh @Override public void onNext(PutResult val) { - final byte[] metadataRaw = new byte[val.getApplicationMetadata().readableBytes()]; - val.getApplicationMetadata().readBytes(metadataRaw); + final byte[] metadataRaw = new byte[val.getApplicationMetadata().capacity()]; + val.getApplicationMetadata().getBytes(0, metadataRaw); final String metadata = new String(metadataRaw, StandardCharsets.UTF_8); if (!Integer.toString(counter).equals(metadata)) { throw new RuntimeException( @@ -123,10 +123,12 @@ public void onNext(PutResult val) { } }); int counter = 0; + int writerIndex = 0; while (reader.read(root)) { final byte[] rawMetadata = Integer.toString(counter).getBytes(StandardCharsets.UTF_8); final ArrowBuf metadata = allocator.buffer(rawMetadata.length); - metadata.writeBytes(rawMetadata); + metadata.setBytes(writerIndex, rawMetadata); + writerIndex += rawMetadata.length; // Transfers ownership of the buffer, so do not release it ourselves stream.putNext(metadata); try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { diff --git a/java/flight/src/main/java/org/apache/arrow/flight/grpc/GetReadableBuffer.java b/java/flight/src/main/java/org/apache/arrow/flight/grpc/GetReadableBuffer.java index a5edfb9c240..a28ea6d8c6b 100644 --- a/java/flight/src/main/java/org/apache/arrow/flight/grpc/GetReadableBuffer.java +++ b/java/flight/src/main/java/org/apache/arrow/flight/grpc/GetReadableBuffer.java @@ -91,8 +91,7 @@ public static void readIntoBuffer(final InputStream stream, final ArrowBuf buf, } else { byte[] heapBytes = new byte[size]; ByteStreams.readFully(stream, heapBytes); - buf.writeBytes(heapBytes); + buf.setBytes(0, heapBytes); } - buf.writerIndex(size); } } diff --git a/java/flight/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java b/java/flight/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java index 30c171b52cb..cbcce73dcf0 100644 --- a/java/flight/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java +++ b/java/flight/src/test/java/org/apache/arrow/flight/TestApplicationMetadata.java @@ -118,7 +118,7 @@ public void onNext(PutResult val) { for (byte i = 0; i < 10; i++) { final IntVector vector = (IntVector) root.getVector("a"); final ArrowBuf metadata = allocator.buffer(1); - metadata.writeByte(i); + metadata.setByte(i, i); vector.set(0, 10); vector.setValueCount(1); root.setRowCount(1); @@ -150,7 +150,7 @@ public void uploadMetadataSync() { for (byte i = 0; i < 10; i++) { final IntVector vector = (IntVector) root.getVector("a"); final ArrowBuf metadata = allocator.buffer(1); - metadata.writeByte(i); + metadata.setByte(i, i); vector.set(0, 10); vector.setValueCount(1); root.setRowCount(1); @@ -186,7 +186,7 @@ public void syncMemoryReclaimed() { for (byte i = 0; i < 10; i++) { final IntVector vector = (IntVector) root.getVector("a"); final ArrowBuf metadata = allocator.buffer(1); - metadata.writeByte(i); + metadata.setByte(i, i); vector.set(0, 10); vector.setValueCount(1); root.setRowCount(1); @@ -234,7 +234,7 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l vector.setValueCount(1); root.setRowCount(1); final ArrowBuf metadata = allocator.buffer(1); - metadata.writeByte(i); + metadata.setByte(i, i); listener.putNext(metadata); } listener.completed(); diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Filter.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Filter.java index 4e9abedadf0..58cced9c781 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Filter.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Filter.java @@ -115,7 +115,7 @@ public void evaluate(int numRows, List buffers, List buffersLayout = new ArrayList<>(); long offset = 0; for (ArrowBuf arrowBuf : buffers) { - long size = arrowBuf.readableBytes(); + long size = arrowBuf.capacity(); buffersLayout.add(new ArrowBuffer(offset, size)); offset += size; } diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java index 08bd3831f9b..dc530e0b780 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java @@ -155,7 +155,7 @@ public void evaluate(int numRows, List buffers, List buffersLayout = new ArrayList<>(); long offset = 0; for (ArrowBuf arrowBuf : buffers) { - long size = arrowBuf.readableBytes(); + long size = arrowBuf.capacity(); buffersLayout.add(new ArrowBuffer(offset, size)); offset += size; } @@ -199,7 +199,7 @@ public void evaluate(int numRows, List buffers, List buffersLayout = new ArrayList<>(); long offset = 0; for (ArrowBuf arrowBuf : buffers) { - long size = arrowBuf.readableBytes(); + long size = arrowBuf.capacity(); buffersLayout.add(new ArrowBuffer(offset, size)); offset += size; } diff --git a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/BaseEvaluatorTest.java b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/BaseEvaluatorTest.java index c774b0450e0..8d59a44f6e7 100644 --- a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/BaseEvaluatorTest.java +++ b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/BaseEvaluatorTest.java @@ -142,6 +142,7 @@ class Int32DataAndVectorGenerator implements DataAndVectorGenerator { protected final BufferAllocator allocator; protected final Random rand; + protected int writerIndex; Int32DataAndVectorGenerator(BufferAllocator allocator) { this.allocator = allocator; @@ -150,7 +151,8 @@ class Int32DataAndVectorGenerator implements DataAndVectorGenerator { @Override public void writeData(ArrowBuf buffer) { - buffer.writeInt(rand.nextInt()); + buffer.setInt(writerIndex, rand.nextInt()); + writerIndex += 4; } @Override @@ -172,7 +174,8 @@ class BoundedInt32DataAndVectorGenerator extends Int32DataAndVectorGenerator { @Override public void writeData(ArrowBuf buffer) { - buffer.writeInt(rand.nextInt(upperBound)); + buffer.setInt(writerIndex, rand.nextInt(upperBound)); + writerIndex += 4; } } @@ -205,31 +208,34 @@ public void tearDown() { ArrowBuf buf(int length) { ArrowBuf buffer = allocator.buffer(length); - return buffer; + ArrowBuf slice = buffer.slice(0, length); + return slice; } ArrowBuf buf(byte[] bytes) { ArrowBuf buffer = allocator.buffer(bytes.length); - buffer.writeBytes(bytes); - return buffer; + buffer.setBytes(0, bytes); + ArrowBuf slice = buffer.slice(0, bytes.length); + return slice; } ArrowBuf arrowBufWithAllValid(int size) { int bufLen = (size + 7) / 8; ArrowBuf buffer = allocator.buffer(bufLen); for (int i = 0; i < bufLen; i++) { - buffer.writeByte(255); + buffer.setByte(i, 255); } - - return buffer; + ArrowBuf slice = buffer.slice(0, bufLen); + return slice; } ArrowBuf intBuf(int[] ints) { ArrowBuf buffer = allocator.buffer(ints.length * 4); for (int i = 0; i < ints.length; i++) { - buffer.writeInt(ints[i]); + buffer.setInt(i, ints[i]); } - return buffer; + ArrowBuf slice = buffer.slice(0, ints.length * 4); + return slice; } DecimalVector decimalVector(String[] values, int precision, int scale) { @@ -258,25 +264,26 @@ VarCharVector varcharVector(String[] values) { ArrowBuf longBuf(long[] longs) { ArrowBuf buffer = allocator.buffer(longs.length * 8); for (int i = 0; i < longs.length; i++) { - buffer.writeLong(longs[i]); + buffer.setLong(i * 8, longs[i]); } - return buffer; + ArrowBuf slice = buffer.slice(0, longs.length * 8); + return slice; } ArrowBuf doubleBuf(double[] data) { ArrowBuf buffer = allocator.buffer(data.length * 8); for (int i = 0; i < data.length; i++) { - buffer.writeDouble(data[i]); + buffer.setDouble(i * 8, data[i]); } - - return buffer; + ArrowBuf slice = buffer.slice(0, data.length * 8); + return slice; } ArrowBuf stringToMillis(String[] dates) { ArrowBuf buffer = allocator.buffer(dates.length * 8); for (int i = 0; i < dates.length; i++) { Instant instant = Instant.parse(dates[i]); - buffer.writeLong(instant.toEpochMilli()); + buffer.setLong(i * 8, instant.toEpochMilli()); } return buffer; diff --git a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java index f6632e5e340..fa4225a6a21 100644 --- a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java +++ b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java @@ -74,16 +74,17 @@ List varBufs(String[] strings, Charset charset) { int startOffset = 0; for (int i = 0; i < strings.length; i++) { - offsetsBuffer.writeInt(startOffset); + offsetsBuffer.setInt(i * 4, startOffset); final byte[] bytes = strings[i].getBytes(charset); - dataBuffer = dataBuffer.reallocIfNeeded(dataBuffer.writerIndex() + bytes.length); + dataBuffer = dataBuffer.reallocIfNeeded(startOffset + bytes.length); dataBuffer.setBytes(startOffset, bytes, 0, bytes.length); startOffset += bytes.length; } - offsetsBuffer.writeInt(startOffset); // offset for the last element + offsetsBuffer.setInt(strings.length * 4, startOffset); // offset for the last element - return Arrays.asList(offsetsBuffer, dataBuffer); + return Arrays.asList(offsetsBuffer.slice(0, (strings.length + 1) * 4), + dataBuffer.slice(0, startOffset)); } List stringBufs(String[] strings) { diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index 91fa5ffd3a7..30b6a98b535 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -61,11 +61,11 @@ public final class ArrowBuf implements AutoCloseable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ArrowBuf.class); - private static final int SHORT_SIZE = Short.BYTES; - private static final int INT_SIZE = Integer.BYTES; - private static final int FLOAT_SIZE = Float.BYTES; - private static final int DOUBLE_SIZE = Double.BYTES; - private static final int LONG_SIZE = Long.BYTES; + public static final int SHORT_SIZE = Short.BYTES; + public static final int INT_SIZE = Integer.BYTES; + public static final int FLOAT_SIZE = Float.BYTES; + public static final int DOUBLE_SIZE = Double.BYTES; + public static final int LONG_SIZE = Long.BYTES; private static final AtomicLong idGenerator = new AtomicLong(0); private static final int LOG_BYTES_PER_ROW = 10; @@ -74,8 +74,6 @@ public final class ArrowBuf implements AutoCloseable { private final BufferManager bufferManager; private final long addr; private final boolean isEmpty; - private int readerIndex; - private int writerIndex; private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id) : null; private volatile int length; @@ -97,8 +95,6 @@ public ArrowBuf( this.isEmpty = isEmpty; this.addr = memoryAddress; this.length = length; - this.readerIndex = 0; - this.writerIndex = 0; if (BaseAllocator.DEBUG) { historicalLog.recordEvent("create()"); } @@ -141,11 +137,9 @@ private void ensureAccessible() { public NettyArrowBuf asNettyBuffer() { final NettyArrowBuf nettyArrowBuf = new NettyArrowBuf( - this, - isEmpty ? null : referenceManager.getAllocator().getAsByteBufAllocator(), - length); - nettyArrowBuf.readerIndex(readerIndex); - nettyArrowBuf.writerIndex(writerIndex); + this, + isEmpty ? null : referenceManager.getAllocator().getAsByteBufAllocator(), + length); return nettyArrowBuf; } @@ -193,29 +187,6 @@ public ByteOrder order() { return ByteOrder.LITTLE_ENDIAN; } - /** - * Returns the number of bytes still available to read in this buffer. - */ - public int readableBytes() { - Preconditions.checkState(writerIndex >= readerIndex, - "Writer index cannot be less than reader index"); - return writerIndex - readerIndex; - } - - /** - * Returns the number of bytes still available to write into this buffer before capacity is reached. - */ - public int writableBytes() { - return capacity() - writerIndex; - } - - /** - * Returns a slice of only the readable bytes in the buffer. - */ - public ArrowBuf slice() { - return slice(readerIndex, readableBytes()); - } - /** * Returns a slice (view) starting at index with the given length. */ @@ -233,12 +204,11 @@ public ArrowBuf slice(int index, int length) { * explains that derived buffers share their reference count with their parent */ final ArrowBuf newBuf = referenceManager.deriveBuffer(this, index, length); - newBuf.writerIndex(length); return newBuf; } public ByteBuffer nioBuffer() { - return isEmpty ? ByteBuffer.allocateDirect(0) : asNettyBuffer().nioBuffer(); + return isEmpty ? ByteBuffer.allocateDirect(0) : asNettyBuffer().nioBuffer(0, capacity()); } public ByteBuffer nioBuffer(int index, int length) { @@ -525,170 +495,6 @@ public byte getByte(int index) { return PlatformDependent.getByte(addr(index)); } - - - /*--------------------------------------------------* - | Following are another set of data set APIs | - | that directly work with writerIndex | - | | - *--------------------------------------------------*/ - - - - /** - * Helper function to do bound checking w.r.t writerIndex - * by checking if we can set "length" bytes of data at the - * writerIndex in this ArrowBuf. - * @param length provided length of data for set - */ - private void ensureWritable(final int length) { - if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { - Preconditions.checkArgument(length >= 0, "expecting non-negative length"); - // check reference count - this.ensureAccessible(); - // check bounds - if (length > writableBytes()) { - throw new IndexOutOfBoundsException( - String.format("writerIndex(%d) + length(%d) exceeds capacity(%d)", writerIndex, length, capacity())); - } - } - } - - /** - * Helper function to do bound checking w.r.t readerIndex - * by checking if we can read "length" bytes of data at the - * readerIndex in this ArrowBuf. - * @param length provided length of data for get - */ - private void ensureReadable(final int length) { - if (BoundsChecking.BOUNDS_CHECKING_ENABLED) { - Preconditions.checkArgument(length >= 0, "expecting non-negative length"); - // check reference count - this.ensureAccessible(); - // check bounds - if (length > readableBytes()) { - throw new IndexOutOfBoundsException( - String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d)", readerIndex, length, writerIndex)); - } - } - } - - /** - * Read the byte at readerIndex. - * @return byte value - */ - public byte readByte() { - ensureReadable(1); - final byte b = getByte(readerIndex); - ++readerIndex; - return b; - } - - /** - * Read dst.length bytes at readerIndex into dst byte array - * @param dst byte array where the data will be written - */ - public void readBytes(byte[] dst) { - Preconditions.checkArgument(dst != null, "expecting valid dst bytearray"); - ensureReadable(dst.length); - getBytes(readerIndex, dst, 0, dst.length); - } - - /** - * Set the provided byte value at the writerIndex. - * @param value value to set - */ - public void writeByte(byte value) { - ensureWritable(1); - PlatformDependent.putByte(addr(writerIndex), value); - ++writerIndex; - } - - /** - * Set the lower order byte for the provided value at - * the writerIndex. - * @param value value to be set - */ - public void writeByte(int value) { - ensureWritable(1); - PlatformDependent.putByte(addr(writerIndex), (byte)value); - ++writerIndex; - } - - /** - * Write the bytes from given byte array into this - * ArrowBuf starting at writerIndex. - * @param src src byte array - */ - public void writeBytes(byte[] src) { - Preconditions.checkArgument(src != null, "expecting valid src array"); - writeBytes(src, 0, src.length); - } - - /** - * Write the bytes from given byte array starting at srcIndex - * into this ArrowBuf starting at writerIndex. - * @param src src byte array - * @param srcIndex index in the byte array where the copy will being from - * @param length length of data to copy - */ - public void writeBytes(byte[] src, int srcIndex, int length) { - ensureWritable(length); - setBytes(writerIndex, src, srcIndex, length); - writerIndex += length; - } - - /** - * Set the provided int value as short at the writerIndex. - * @param value value to set - */ - public void writeShort(int value) { - ensureWritable(SHORT_SIZE); - PlatformDependent.putShort(addr(writerIndex), (short) value); - writerIndex += SHORT_SIZE; - } - - /** - * Set the provided int value at the writerIndex. - * @param value value to set - */ - public void writeInt(int value) { - ensureWritable(INT_SIZE); - PlatformDependent.putInt(addr(writerIndex), value); - writerIndex += INT_SIZE; - } - - /** - * Set the provided long value at the writerIndex. - * @param value value to set - */ - public void writeLong(long value) { - ensureWritable(LONG_SIZE); - PlatformDependent.putLong(addr(writerIndex), value); - writerIndex += LONG_SIZE; - } - - /** - * Set the provided float value at the writerIndex. - * @param value value to set - */ - public void writeFloat(float value) { - ensureWritable(FLOAT_SIZE); - PlatformDependent.putInt(addr(writerIndex), Float.floatToRawIntBits(value)); - writerIndex += FLOAT_SIZE; - } - - /** - * Set the provided double value at the writerIndex. - * @param value value to set - */ - public void writeDouble(double value) { - ensureWritable(DOUBLE_SIZE); - PlatformDependent.putLong(addr(writerIndex), Double.doubleToRawLongBits(value)); - writerIndex += DOUBLE_SIZE; - } - - /*--------------------------------------------------* | Following are another set of data set/get APIs | | that read and write stream of bytes from/to byte | @@ -970,7 +776,7 @@ public void setBytes(int index, ArrowBuf src, int srcIndex, int length) { } /** - * Copy readableBytes() number of bytes from src ArrowBuf + * Copy all bytes from src ArrowBuf * starting from its readerIndex into this ArrowBuf starting * at the given index. * @param index index index (0 based relative to the portion of memory @@ -980,13 +786,12 @@ public void setBytes(int index, ArrowBuf src, int srcIndex, int length) { public void setBytes(int index, ArrowBuf src) { // null check Preconditions.checkArgument(src != null, "expecting valid ArrowBuf"); - final int length = src.readableBytes(); + final int length = src.capacity(); // bound check for this ArrowBuf where the data will be copied into checkIndex(index, length); - final long srcAddress = src.memoryAddress() + (long)src.readerIndex; + final long srcAddress = src.memoryAddress(); final long dstAddress = addr(index); PlatformDependent.copyMemory(srcAddress, dstAddress, (long)length); - src.readerIndex(src.readerIndex + length); } /** @@ -1113,42 +918,6 @@ public void print(StringBuilder sb, int indent, Verbosity verbosity) { } } - /** - * Get the index at which the next byte will be read from. - * @return reader index - */ - public int readerIndex() { - return readerIndex; - } - - /** - * Get the index at which next byte will be written to. - * @return writer index - */ - public int writerIndex() { - return writerIndex; - } - - /** - * Set the reader index for this ArrowBuf. - * @param readerIndex new reader index - * @return this ArrowBuf - */ - public ArrowBuf readerIndex(int readerIndex) { - this.readerIndex = readerIndex; - return this; - } - - /** - * Set the writer index for this ArrowBuf. - * @param writerIndex new writer index - * @return this ArrowBuf - */ - public ArrowBuf writerIndex(int writerIndex) { - this.writerIndex = writerIndex; - return this; - } - /** * Zero-out the bytes in this ArrowBuf starting at * the given index for the given length. @@ -1220,27 +989,4 @@ public void retain(int increment) { referenceManager.retain(increment); } - @Deprecated - public ArrowBuf clear() { - this.readerIndex = this.writerIndex = 0; - return this; - } - - /** - * Initialize the reader and writer index. - * @param readerIndex index to read from - * @param writerIndex index to write to - * @return this - */ - @Deprecated - public ArrowBuf setIndex(int readerIndex, int writerIndex) { - if (readerIndex >= 0 && readerIndex <= writerIndex && writerIndex <= this.capacity()) { - this.readerIndex = readerIndex; - this.writerIndex = writerIndex; - return this; - } else { - throw new IndexOutOfBoundsException(String.format("readerIndex: %d, writerIndex: %d " + - "(expected:0 <= readerIndex <= writerIndex <= capacity(%d))", readerIndex, writerIndex, this.capacity())); - } - } } diff --git a/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java index 43d3f60d866..3e548d0fad6 100644 --- a/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java @@ -161,12 +161,14 @@ public int capacity() { @Override public NettyArrowBuf slice() { - return arrowBuf.slice(readerIndex, writerIndex - readerIndex).asNettyBuffer(); + return slice(readerIndex, writerIndex - readerIndex); } @Override public NettyArrowBuf slice(int index, int length) { - return arrowBuf.slice(index, length).asNettyBuffer(); + NettyArrowBuf slice = arrowBuf.slice(index, length).asNettyBuffer(); + slice.writerIndex(length); + return slice; } @Override diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java index 90d74a3ca3b..2c95cfef830 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -328,8 +328,6 @@ public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) { // create a new ArrowBuf to associate with new allocator and target ref manager final int targetBufLength = srcBuffer.capacity(); ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength); - targetArrowBuf.readerIndex(srcBuffer.readerIndex()); - targetArrowBuf.writerIndex(srcBuffer.writerIndex()); return targetArrowBuf; } @@ -425,8 +423,6 @@ public TransferResult transferOwnership(final ArrowBuf srcBuffer, final BufferAl // create a new ArrowBuf to associate with new allocator and target ref manager final int targetBufLength = srcBuffer.capacity(); final ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength); - targetArrowBuf.readerIndex(srcBuffer.readerIndex()); - targetArrowBuf.writerIndex(srcBuffer.writerIndex()); final boolean allocationFit = transferBalance(targetRefManager); return new TransferResult(allocationFit, targetArrowBuf); } diff --git a/java/memory/src/test/java/io/netty/buffer/TestNettyArrowBuf.java b/java/memory/src/test/java/io/netty/buffer/TestNettyArrowBuf.java index 5672dfff48e..0f84c9c485e 100644 --- a/java/memory/src/test/java/io/netty/buffer/TestNettyArrowBuf.java +++ b/java/memory/src/test/java/io/netty/buffer/TestNettyArrowBuf.java @@ -131,7 +131,7 @@ public void testGetCompositeBuffer() { // to set capacity. byteBufs.addComponent(true, buf2); byteBufs.capacity(20); - buf.asNettyBuffer().getBytes(0, byteBufs, 4); + buf.asNettyBuffer().getBytes(0, byteBufs,4); int actual = byteBufs.getInt(0); Assert.assertEquals(expected, actual); byteBufs.component(0).release(); diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index fd5bccd58c9..df815683524 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -747,52 +747,23 @@ public void testAllocator_sliceRanges() throws Exception { // Populate a buffer with byte values corresponding to their indices. final ArrowBuf arrowBuf = rootAllocator.buffer(256); assertEquals(256, arrowBuf.capacity()); - assertEquals(0, arrowBuf.readerIndex()); - assertEquals(0, arrowBuf.readableBytes()); - assertEquals(0, arrowBuf.writerIndex()); - assertEquals(256, arrowBuf.writableBytes()); - - final ArrowBuf slice3 = arrowBuf.slice(); - assertEquals(0, slice3.readerIndex()); - assertEquals(0, slice3.readableBytes()); - assertEquals(0, slice3.writerIndex()); - // assertEquals(256, slice3.capacity()); - // assertEquals(256, slice3.writableBytes()); for (int i = 0; i < 256; ++i) { - arrowBuf.writeByte(i); + arrowBuf.setByte(i, i); } - assertEquals(0, arrowBuf.readerIndex()); - assertEquals(256, arrowBuf.readableBytes()); - assertEquals(256, arrowBuf.writerIndex()); - assertEquals(0, arrowBuf.writableBytes()); - - final ArrowBuf slice1 = arrowBuf.slice(); - assertEquals(0, slice1.readerIndex()); - assertEquals(256, slice1.readableBytes()); + + final ArrowBuf slice1 = arrowBuf.slice(0, 256); for (int i = 0; i < 10; ++i) { - assertEquals(i, slice1.readByte()); + assertEquals(i, slice1.getByte(i)); } - assertEquals(256 - 10, slice1.readableBytes()); for (int i = 0; i < 256; ++i) { assertEquals((byte) i, slice1.getByte(i)); } final ArrowBuf slice2 = arrowBuf.slice(25, 25); - assertEquals(0, slice2.readerIndex()); - assertEquals(25, slice2.readableBytes()); - for (int i = 25; i < 50; ++i) { - assertEquals(i, slice2.readByte()); - } - - /* - for(int i = 256; i > 0; --i) { - slice3.writeByte(i - 1); - } - for(int i = 0; i < 256; ++i) { - assertEquals(255 - i, slice1.getByte(i)); + for (int i = 0; i < 25; ++i) { + assertEquals(i + 25, slice2.getByte(i)); } - */ arrowBuf.getReferenceManager().release(); // all the derived buffers share this fate } @@ -806,7 +777,7 @@ public void testAllocator_slicesOfSlices() throws Exception { // Populate a buffer with byte values corresponding to their indices. final ArrowBuf arrowBuf = rootAllocator.buffer(256); for (int i = 0; i < 256; ++i) { - arrowBuf.writeByte(i); + arrowBuf.setByte(i, i); } // Slice it up. @@ -1064,7 +1035,6 @@ public void multiple() throws Exception { } public void assertEquiv(ArrowBuf origBuf, ArrowBuf newBuf) { - assertEquals(origBuf.readerIndex(), newBuf.readerIndex()); - assertEquals(origBuf.writerIndex(), newBuf.writerIndex()); + assertEquals(origBuf.capacity(), newBuf.capacity()); } } diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index 7f5ca3ec287..f0f38d986e0 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -157,15 +157,16 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers @Override public List getFieldBuffers() { List result = new ArrayList<>(1); - setReaderAndWriterIndex(); - result.add(typeBuffer); + result.add(sliceTypeBuffer()); return result; } - private void setReaderAndWriterIndex() { - typeBuffer.readerIndex(0); - typeBuffer.writerIndex(valueCount * TYPE_WIDTH); + private ArrowBuf sliceTypeBuffer() { + if (valueCount == 0) { + return typeBuffer.slice(0, 0); + } + return typeBuffer.slice(0, valueCount * TYPE_WIDTH); } @Override @@ -299,7 +300,6 @@ public boolean allocateNewSafe() { private void allocateTypeBuffer() { typeBuffer = allocator.buffer(typeBufferAllocationSizeInBytes); - typeBuffer.readerIndex(0); typeBuffer.setZero(0, typeBuffer.capacity()); } @@ -528,14 +528,12 @@ public int getBufferSizeFor(final int valueCount) { @Override public ArrowBuf[] getBuffers(boolean clear) { List list = new java.util.ArrayList<>(); - setReaderAndWriterIndex(); if (getBufferSize() != 0) { - list.add(typeBuffer); + list.add(sliceTypeBuffer()); list.addAll(java.util.Arrays.asList(internalStruct.getBuffers(clear))); } if (clear) { valueCount = 0; - typeBuffer.getReferenceManager().retain(); typeBuffer.getReferenceManager().release(); typeBuffer = allocator.getEmpty(); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java index d5c6b186cf7..7c70f004911 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java @@ -339,7 +339,6 @@ private void allocateBytes(int valueCount) { */ private void allocateValidityBuffer(final int validityBufferSize) { validityBuffer = allocator.buffer(validityBufferSize); - validityBuffer.readerIndex(0); } /** @@ -392,18 +391,17 @@ public Field getField() { @Override public ArrowBuf[] getBuffers(boolean clear) { final ArrowBuf[] buffers; - setReaderAndWriterIndex(); if (getBufferSize() == 0) { buffers = new ArrowBuf[0]; } else { buffers = new ArrowBuf[2]; - buffers[0] = validityBuffer; - buffers[1] = valueBuffer; + buffers[0] = sliceValidityBuffer(); + buffers[1] = sliceValueBuffer(); } if (clear) { - for (final ArrowBuf buffer : buffers) { + /*for (final ArrowBuf buffer : buffers) { buffer.getReferenceManager().retain(1); - } + }*/ clear(); } return buffers; @@ -499,31 +497,25 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers */ public List getFieldBuffers() { List result = new ArrayList<>(2); - setReaderAndWriterIndex(); - result.add(validityBuffer); - result.add(valueBuffer); + result.add(sliceValidityBuffer()); + result.add(sliceValueBuffer()); return result; } - /** - * Set the reader and writer indexes for the inner buffers. - */ - private void setReaderAndWriterIndex() { - validityBuffer.readerIndex(0); - valueBuffer.readerIndex(0); + private ArrowBuf sliceValidityBuffer() { if (valueCount == 0) { - validityBuffer.writerIndex(0); - valueBuffer.writerIndex(0); - } else { - validityBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount)); - if (typeWidth == 0) { - /* specialized handling for BitVector */ - valueBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount)); - } else { - valueBuffer.writerIndex(valueCount * typeWidth); - } + return validityBuffer.slice(0, 0); + } + return validityBuffer.slice(0, getValidityBufferSizeFromCount(valueCount)); + } + + private ArrowBuf sliceValueBuffer() { + if (valueCount == 0) { + return valueBuffer.slice(0, 0); } + return valueBuffer.slice(0, typeWidth == 0 ? + getValidityBufferSizeFromCount(valueCount) : valueCount * typeWidth); } /** @@ -735,7 +727,6 @@ public void setValueCount(int valueCount) { decrementAllocationMonitor(); } } - setReaderAndWriterIndex(); } /** diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java index 04c693f7e11..d63c69432f7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseValueVector.java @@ -200,8 +200,6 @@ DataAndValidityBuffers allocFixedDataAndValidityBufs(int valueCount, int typeWid int len = (numBuffers == 0 ? dataBufferSize : validityBufferSize); ArrowBuf buf = combinedBuffer.slice(bufferOffset, len); buf.getReferenceManager().retain(); - buf.readerIndex(0); - buf.writerIndex(0); bufferOffset += len; if (numBuffers == 0) { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java index 0ca79eb072f..d3193365cb6 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java @@ -336,31 +336,32 @@ public List getFieldBuffers() { fillHoles(valueCount); List result = new ArrayList<>(3); - setReaderAndWriterIndex(); - result.add(validityBuffer); - result.add(offsetBuffer); - result.add(valueBuffer); + result.add(sliceValidityBuffer()); + result.add(sliceOffsetBuffer()); + result.add(sliceValueBuffer()); return result; } - /** - * Set the reader and writer indexes for the inner buffers. - */ - private void setReaderAndWriterIndex() { - validityBuffer.readerIndex(0); - offsetBuffer.readerIndex(0); - valueBuffer.readerIndex(0); + private ArrowBuf sliceValidityBuffer() { if (valueCount == 0) { - validityBuffer.writerIndex(0); - offsetBuffer.writerIndex(0); - valueBuffer.writerIndex(0); - } else { - final int lastDataOffset = getStartOffset(valueCount); - validityBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount)); - offsetBuffer.writerIndex((valueCount + 1) * OFFSET_WIDTH); - valueBuffer.writerIndex(lastDataOffset); + return validityBuffer.slice(0, 0); + } + return validityBuffer.slice(0, getValidityBufferSizeFromCount(valueCount)); + } + + private ArrowBuf sliceValueBuffer() { + if (valueCount == 0) { + return valueBuffer.slice(0, 0); } + return valueBuffer.slice(0, getStartOffset(valueCount)); + } + + private ArrowBuf sliceOffsetBuffer() { + if (valueCount == 0) { + return offsetBuffer.slice(0, 0); + } + return offsetBuffer.slice(0, (valueCount + 1) * OFFSET_WIDTH); } /** @@ -450,7 +451,6 @@ private void allocateBytes(final int valueBufferSize, final int valueCount) { /* allocate data buffer */ int curSize = valueBufferSize; valueBuffer = allocator.buffer(curSize); - valueBuffer.readerIndex(0); /* allocate offset buffer and validity buffer */ DataAndValidityBuffers buffers = allocFixedDataAndValidityBufs(valueCount + 1, OFFSET_WIDTH); @@ -467,7 +467,6 @@ private void allocateBytes(final int valueBufferSize, final int valueCount) { private void allocateOffsetBuffer(final long size) { final int curSize = (int) size; offsetBuffer = allocator.buffer(curSize); - offsetBuffer.readerIndex(0); initOffsetBuffer(); } @@ -475,7 +474,6 @@ private void allocateOffsetBuffer(final long size) { private void allocateValidityBuffer(final long size) { final int curSize = (int) size; validityBuffer = allocator.buffer(curSize); - validityBuffer.readerIndex(0); initValidityBuffer(); } @@ -637,19 +635,15 @@ public Field getField() { @Override public ArrowBuf[] getBuffers(boolean clear) { final ArrowBuf[] buffers; - setReaderAndWriterIndex(); if (getBufferSize() == 0) { buffers = new ArrowBuf[0]; } else { buffers = new ArrowBuf[3]; - buffers[0] = validityBuffer; - buffers[1] = offsetBuffer; - buffers[2] = valueBuffer; + buffers[0] = sliceValidityBuffer(); + buffers[1] = sliceOffsetBuffer(); + buffers[2] = sliceValueBuffer(); } if (clear) { - for (final ArrowBuf buffer : buffers) { - buffer.getReferenceManager().retain(); - } clear(); } return buffers; @@ -877,7 +871,6 @@ public void setValueCount(int valueCount) { } fillHoles(valueCount); lastSet = valueCount - 1; - setReaderAndWriterIndex(); } /** @@ -1287,9 +1280,6 @@ public static ArrowBuf set(ArrowBuf buffer, BufferAllocator allocator, buffer = allocator.buffer(valueCount * OFFSET_WIDTH); } buffer.setInt(index * OFFSET_WIDTH, value); - if (index == (valueCount - 1)) { - buffer.writerIndex(valueCount * OFFSET_WIDTH); - } return buffer; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java index c30432ace49..ec769179a18 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java @@ -196,7 +196,6 @@ private ArrowBuf splitAndTransferBuffer( * another part in (i+1)-th byte. */ destBuffer = allocator.buffer(byteSizeTarget); - destBuffer.readerIndex(0); destBuffer.setZero(0, destBuffer.capacity()); for (int i = 0; i < byteSizeTarget - 1; i++) { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java index f17e9e82b79..1f529f7e9ae 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java @@ -144,9 +144,6 @@ public static ArrowBuf setValidityBit(ArrowBuf validityBuffer, BufferAllocator a validityBuffer = allocator.buffer(getValidityBufferSize(valueCount)); } setValidityBit(validityBuffer, index, value); - if (index == (valueCount - 1)) { - validityBuffer.writerIndex(getValidityBufferSize(valueCount)); - } return validityBuffer; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java index 39868c508da..0c6ce9c90e7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java @@ -323,9 +323,7 @@ public int getBufferSize() { int actualBufSize = 0; for (final ValueVector v : vectors.values()) { - for (final ArrowBuf buf : v.getBuffers(false)) { - actualBufSize += buf.writerIndex(); - } + actualBufSize += v.getBufferSize(); } return actualBufSize; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java index a43c8ccc752..8e2c3710bb9 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java @@ -99,7 +99,6 @@ public boolean allocateNewSafe() { protected void allocateOffsetBuffer(final long size) { final int curSize = (int) size; offsetBuffer = allocator.buffer(curSize); - offsetBuffer.readerIndex(0); offsetAllocationSizeInBytes = curSize; offsetBuffer.setZero(0, offsetBuffer.capacity()); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java index d71d95c18a4..2d136f60c32 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java @@ -169,15 +169,16 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers @Override public List getFieldBuffers() { List result = new ArrayList<>(1); - setReaderAndWriterIndex(); - result.add(validityBuffer); + result.add(sliceValidityBuffer()); return result; } - private void setReaderAndWriterIndex() { - validityBuffer.readerIndex(0); - validityBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount)); + private ArrowBuf sliceValidityBuffer() { + if (valueCount == 0) { + return validityBuffer.slice(0, 0); + } + return validityBuffer.slice(0, getValidityBufferSizeFromCount(valueCount)); } @Override @@ -232,7 +233,6 @@ public boolean allocateNewSafe() { private void allocateValidityBuffer(final long size) { final int curSize = (int) size; validityBuffer = allocator.buffer(curSize); - validityBuffer.readerIndex(0); validityAllocationSizeInBytes = curSize; validityBuffer.setZero(0, validityBuffer.capacity()); } @@ -342,20 +342,16 @@ public void reset() { @Override public ArrowBuf[] getBuffers(boolean clear) { - setReaderAndWriterIndex(); final ArrowBuf[] buffers; if (getBufferSize() == 0) { buffers = new ArrowBuf[0]; } else { List list = new ArrayList<>(); - list.add(validityBuffer); + list.add(sliceValidityBuffer()); list.addAll(Arrays.asList(vector.getBuffers(false))); buffers = list.toArray(new ArrowBuf[list.size()]); } if (clear) { - for (ArrowBuf buffer : buffers) { - buffer.getReferenceManager().retain(); - } clear(); } return buffers; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index b6ad8224ef9..b3aa916d7df 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -219,26 +219,24 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers @Override public List getFieldBuffers() { List result = new ArrayList<>(2); - setReaderAndWriterIndex(); - result.add(validityBuffer); - result.add(offsetBuffer); + result.add(sliceValidityBuffer()); + result.add(sliceOffsetBuffer()); return result; } - /** - * Set the reader and writer indexes for the inner buffers. - */ - private void setReaderAndWriterIndex() { - validityBuffer.readerIndex(0); - offsetBuffer.readerIndex(0); + private ArrowBuf sliceValidityBuffer() { if (valueCount == 0) { - validityBuffer.writerIndex(0); - offsetBuffer.writerIndex(0); - } else { - validityBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount)); - offsetBuffer.writerIndex((valueCount + 1) * OFFSET_WIDTH); + return validityBuffer.slice(0, 0); + } + return validityBuffer.slice(0, getValidityBufferSizeFromCount(valueCount)); + } + + private ArrowBuf sliceOffsetBuffer() { + if (valueCount == 0) { + return offsetBuffer.slice(0, 0); } + return offsetBuffer.slice(0, (valueCount + 1) * OFFSET_WIDTH); } @Override @@ -289,7 +287,6 @@ public boolean allocateNewSafe() { private void allocateValidityBuffer(final long size) { final int curSize = (int) size; validityBuffer = allocator.buffer(curSize); - validityBuffer.readerIndex(0); validityAllocationSizeInBytes = curSize; validityBuffer.setZero(0, validityBuffer.capacity()); } @@ -651,21 +648,17 @@ public void reset() { */ @Override public ArrowBuf[] getBuffers(boolean clear) { - setReaderAndWriterIndex(); final ArrowBuf[] buffers; if (getBufferSize() == 0) { buffers = new ArrowBuf[0]; } else { List list = new ArrayList<>(); - list.add(offsetBuffer); - list.add(validityBuffer); + list.add(sliceOffsetBuffer()); + list.add(sliceValidityBuffer()); list.addAll(Arrays.asList(vector.getBuffers(false))); buffers = list.toArray(new ArrowBuf[list.size()]); } if (clear) { - for (ArrowBuf buffer : buffers) { - buffer.getReferenceManager().retain(); - } clear(); } return buffers; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java index 71af6dfed29..bc66fa3c99c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java @@ -113,15 +113,16 @@ public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers @Override public List getFieldBuffers() { List result = new ArrayList<>(1); - setReaderAndWriterIndex(); - result.add(validityBuffer); + result.add(sliceValidityBuffer()); return result; } - private void setReaderAndWriterIndex() { - validityBuffer.readerIndex(0); - validityBuffer.writerIndex(BitVectorHelper.getValidityBufferSize(valueCount)); + private ArrowBuf sliceValidityBuffer() { + if (valueCount == 0) { + return validityBuffer.slice(0, 0); + } + return validityBuffer.slice(0, BitVectorHelper.getValidityBufferSize(valueCount)); } @Override @@ -286,20 +287,16 @@ public int getValueCapacity() { */ @Override public ArrowBuf[] getBuffers(boolean clear) { - setReaderAndWriterIndex(); final ArrowBuf[] buffers; if (getBufferSize() == 0) { buffers = new ArrowBuf[0]; } else { List list = new ArrayList<>(); - list.add(validityBuffer); + list.add(sliceValidityBuffer()); list.addAll(Arrays.asList(super.getBuffers(false))); buffers = list.toArray(new ArrowBuf[list.size()]); } if (clear) { - for (ArrowBuf buffer : buffers) { - buffer.getReferenceManager().retain(); - } clear(); } @@ -407,7 +404,6 @@ public boolean allocateNewSafe() { private void allocateValidityBuffer(final long size) { final int curSize = (int) size; validityBuffer = allocator.buffer(curSize); - validityBuffer.readerIndex(0); validityAllocationSizeInBytes = curSize; validityBuffer.setZero(0, validityBuffer.capacity()); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java index 2e37e835058..564669ba3e5 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java @@ -18,6 +18,7 @@ package org.apache.arrow.vector.ipc; import static com.fasterxml.jackson.core.JsonToken.*; +import static io.netty.buffer.ArrowBuf.*; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.arrow.vector.BufferLayout.BufferType.*; @@ -244,7 +245,6 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException BitVectorHelper.setValidityBit(buf, i, parser.readValueAs(Boolean.class) ? 1 : 0); } - buf.writerIndex(bufferSize); return buf; } }; @@ -257,8 +257,8 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException for (int i = 0; i < count; i++) { readToken(START_OBJECT); - buf.writeInt(readNextField("days", Integer.class)); - buf.writeInt(readNextField("milliseconds", Integer.class)); + buf.setInt(i * INT_SIZE * 2, readNextField("days", Integer.class)); + buf.setInt(i * INT_SIZE * 2 + INT_SIZE, readNextField("milliseconds", Integer.class)); readToken(END_OBJECT); } @@ -274,7 +274,7 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException for (int i = 0; i < count; i++) { parser.nextToken(); - buf.writeByte(parser.getByteValue()); + buf.setByte(i, parser.getByteValue()); } return buf; @@ -289,7 +289,7 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException for (int i = 0; i < count; i++) { parser.nextToken(); - buf.writeShort(parser.getShortValue()); + buf.setShort(i * SHORT_SIZE, parser.getShortValue()); } return buf; @@ -304,7 +304,7 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException for (int i = 0; i < count; i++) { parser.nextToken(); - buf.writeInt(parser.getIntValue()); + buf.setInt(i * INT_SIZE, parser.getIntValue()); } return buf; @@ -320,7 +320,7 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException for (int i = 0; i < count; i++) { parser.nextToken(); String value = parser.getValueAsString(); - buf.writeLong(Long.valueOf(value)); + buf.setLong(i * LONG_SIZE, Long.valueOf(value)); } return buf; @@ -335,7 +335,7 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException for (int i = 0; i < count; i++) { parser.nextToken(); - buf.writeByte(parser.getShortValue() & 0xFF); + buf.setByte(i, parser.getShortValue() & 0xFF); } return buf; @@ -350,7 +350,7 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException for (int i = 0; i < count; i++) { parser.nextToken(); - buf.writeShort(parser.getIntValue() & 0xFFFF); + buf.setShort(i * SHORT_SIZE, parser.getIntValue() & 0xFFFF); } return buf; @@ -365,7 +365,7 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException for (int i = 0; i < count; i++) { parser.nextToken(); - buf.writeInt((int)parser.getLongValue()); + buf.setInt(i * INT_SIZE, (int)parser.getLongValue()); } return buf; @@ -381,7 +381,7 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException for (int i = 0; i < count; i++) { parser.nextToken(); BigInteger value = new BigInteger(parser.getValueAsString()); - buf.writeLong(value.longValue()); + buf.setLong(i * LONG_SIZE, value.longValue()); } return buf; @@ -396,7 +396,7 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException for (int i = 0; i < count; i++) { parser.nextToken(); - buf.writeFloat(parser.getFloatValue()); + buf.setFloat(i * FLOAT_SIZE, parser.getFloatValue()); } return buf; @@ -411,7 +411,7 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException for (int i = 0; i < count; i++) { parser.nextToken(); - buf.writeDouble(parser.getDoubleValue()); + buf.setDouble(i * DOUBLE_SIZE, parser.getDoubleValue()); } return buf; @@ -430,7 +430,6 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException DecimalUtility.writeBigDecimalToArrowBuf(decimalValue, buf, i); } - buf.writerIndex(size); return buf; } }; @@ -447,8 +446,9 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException int byteWidth = count > 0 ? values.get(0).length : 0; ArrowBuf buf = allocator.buffer(byteWidth * count); - for (byte[] value : values) { - buf.writeBytes(value); + for (int i = 0; i < values.size(); i++) { + final byte[] value = values.get(i); + buf.setBytes(i * byteWidth, value); } return buf; @@ -470,8 +470,11 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException ArrowBuf buf = allocator.buffer(bufferSize); - for (byte[] value : values) { - buf.writeBytes(value); + int writerIndex = 0; + for (int i = 0; i < values.size(); i++) { + final byte[] value = values.get(i); + buf.setBytes(writerIndex, value); + writerIndex += value.length; } return buf; @@ -493,8 +496,11 @@ protected ArrowBuf read(BufferAllocator allocator, int count) throws IOException ArrowBuf buf = allocator.buffer(bufferSize); - for (byte[] value : values) { - buf.writeBytes(value); + int writerIndex = 0; + for (int i = 0; i < values.size(); i++) { + final byte[] value = values.get(i); + buf.setBytes(writerIndex, value); + writerIndex += value.length; } return buf; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java index 02061c7ab22..584dced7b11 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java @@ -81,8 +81,7 @@ public int readFully(ByteBuffer buffer) throws IOException { * @throws IOException if nit enough bytes left to read */ public int readFully(ArrowBuf buffer, int l) throws IOException { - int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l)); - buffer.writerIndex(buffer.writerIndex() + n); + int n = readFully(buffer.nioBuffer(0, l)); return n; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java index db82bad5963..9a77eec1de1 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java @@ -105,7 +105,7 @@ public long writeIntLittleEndian(int v) throws IOException { * Writes the buffer to the underlying channel. */ public void write(ArrowBuf buffer) throws IOException { - ByteBuffer nioBuffer = buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes()); + ByteBuffer nioBuffer = buffer.nioBuffer(0, buffer.capacity()); write(nioBuffer); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index fae45d0b94d..fd2c6b48067 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -76,7 +76,7 @@ public ArrowRecordBatch(int length, List nodes, List b long offset = 0; for (ArrowBuf arrowBuf : buffers) { arrowBuf.getReferenceManager().retain(); - long size = arrowBuf.readableBytes(); + long size = arrowBuf.capacity(); arrowBuffers.add(new ArrowBuffer(offset, size)); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Buffer in RecordBatch at {}, length: {}", offset, size); @@ -101,7 +101,7 @@ private ArrowRecordBatch(boolean dummy, int length, List nodes, List arrowBuffers = new ArrayList<>(); long offset = 0; for (ArrowBuf arrowBuf : buffers) { - long size = arrowBuf.readableBytes(); + long size = arrowBuf.capacity(); arrowBuffers.add(new ArrowBuffer(offset, size)); offset += size; } @@ -149,8 +149,7 @@ public ArrowRecordBatch cloneWithTransfer(final BufferAllocator allocator) { final List newBufs = buffers.stream() .map(buf -> (buf.getReferenceManager().transferOwnership(buf, allocator) - .getTransferredBuffer()) - .writerIndex(buf.writerIndex())) + .getTransferredBuffer())) .collect(Collectors.toList()); close(); return new ArrowRecordBatch(false, length, nodes, newBufs); @@ -221,7 +220,7 @@ public int computeBodyLength() { ArrowBuffer layout = buffersLayout.get(i); size += (layout.getOffset() - size); ByteBuffer nioBuffer = - buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes()); + buffer.nioBuffer(0, buffer.capacity()); size += nioBuffer.remaining(); // round up size to the next multiple of 8 diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java index e8e9cbca018..71529fea744 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java @@ -734,7 +734,7 @@ public void testNullableFixedType3() { ArrowBuf validityVectorBuf = buffers.get(0); /* bitvector tracks 1024 integers --> 1024 bits --> 128 bytes */ - assertTrue(validityVectorBuf.readableBytes() >= 128); + //assertTrue(validityVectorBuf.readableBytes() >= 128); assertEquals(3, validityVectorBuf.getByte(0)); // 1st and second bit defined for (int i = 1; i < 12; i++) { assertEquals(0, validityVectorBuf.getByte(i)); // nothing defined until 100 @@ -1408,9 +1408,7 @@ public void testFillEmptiesNotOverfill() { /* the above set method should NOT have triggered a realloc */ assertEquals(initialCapacity, vector.getValueCapacity()); - int bufSizeBefore = vector.getFieldBuffers().get(1).capacity(); vector.setValueCount(initialCapacity); - assertEquals(bufSizeBefore, vector.getFieldBuffers().get(1).capacity()); assertEquals(initialCapacity, vector.getValueCapacity()); } } @@ -2679,11 +2677,11 @@ public void testUnloadVariableWidthVector() { ArrowBuf offsetBuf = bufs.get(1); ArrowBuf dataBuf = bufs.get(2); - assertEquals(12, offsetBuf.writerIndex()); + //assertEquals(12, offsetBuf.writerIndex()); assertEquals(4, offsetBuf.getInt(4)); assertEquals(4, offsetBuf.getInt(8)); - assertEquals(4, dataBuf.writerIndex()); + //assertEquals(4, dataBuf.writerIndex()); } } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java index d6e635751bf..0620d279749 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java @@ -150,17 +150,15 @@ public void testUnloadLoadAddPadding() throws IOException { List oldBuffers = recordBatch.getBuffers(); List newBuffers = new ArrayList<>(); for (ArrowBuf oldBuffer : oldBuffers) { - int l = oldBuffer.readableBytes(); + int l = oldBuffer.capacity(); if (l % 64 != 0) { // pad l = l + 64 - l % 64; } ArrowBuf newBuffer = allocator.buffer(l); - for (int i = oldBuffer.readerIndex(); i < oldBuffer.writerIndex(); i++) { - newBuffer.setByte(i - oldBuffer.readerIndex(), oldBuffer.getByte(i)); + for (int i = 0; i < oldBuffer.capacity(); i++) { + newBuffer.setByte(i, oldBuffer.getByte(i)); } - newBuffer.readerIndex(0); - newBuffer.writerIndex(l); newBuffers.add(newBuffer); } @@ -220,8 +218,6 @@ public void testLoadValidityBuffer() throws IOException { buf2.setInt(j * 4, j); } - buf1.writerIndex((int)Math.ceil(count / 8)); - buf2.writerIndex(count * 4); } /* diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java index 374522f662f..111f62c0069 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java @@ -167,11 +167,9 @@ protected void validateContent(int count, VectorSchemaRoot root) { protected void writeComplexData(int count, StructVector parent) { ArrowBuf varchar = allocator.buffer(3); - varchar.readerIndex(0); varchar.setByte(0, 'a'); varchar.setByte(1, 'b'); varchar.setByte(2, 'c'); - varchar.writerIndex(3); ComplexWriter writer = new ComplexWriterImpl("root", parent); StructWriter rootWriter = writer.rootAsStruct(); IntWriter intWriter = rootWriter.integer("int"); @@ -580,11 +578,9 @@ public void validateUnionData(int count, VectorSchemaRoot root) { public void writeUnionData(int count, StructVector parent) { ArrowBuf varchar = allocator.buffer(3); - varchar.readerIndex(0); varchar.setByte(0, 'a'); varchar.setByte(1, 'b'); varchar.setByte(2, 'c'); - varchar.writerIndex(3); ComplexWriter writer = new ComplexWriterImpl("root", parent); StructWriter rootWriter = writer.rootAsStruct(); IntWriter intWriter = rootWriter.integer("union"); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java index 1cbd5bb1dbf..b49094b02c0 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java @@ -53,13 +53,13 @@ public class MessageSerializerTest { public static ArrowBuf buf(BufferAllocator alloc, byte[] bytes) { ArrowBuf buffer = alloc.buffer(bytes.length); - buffer.writeBytes(bytes); + buffer.setBytes(0, bytes); return buffer; } public static byte[] array(ArrowBuf buf) { - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); + byte[] bytes = new byte[buf.capacity()]; + buf.getBytes(0, bytes); return bytes; } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java index 4d6da124c04..c4f1958b34c 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java @@ -116,13 +116,13 @@ public void terminate() throws Exception { ArrowBuf buf(byte[] bytes) { ArrowBuf buffer = allocator.buffer(bytes.length); - buffer.writeBytes(bytes); + buffer.setBytes(0, bytes); return buffer; } byte[] array(ArrowBuf buf) { - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); + byte[] bytes = new byte[buf.capacity()]; + buf.getBytes(0, bytes); return bytes; } @@ -537,12 +537,11 @@ public void testChannelReadFully() throws IOException { try (ReadChannel channel = new ReadChannel(Channels.newChannel(new ByteArrayInputStream(buf.array()))); ArrowBuf arrBuf = allocator.buffer(8)) { arrBuf.setInt(0, 100); - arrBuf.writerIndex(4); - assertEquals(4, arrBuf.writerIndex()); - int n = channel.readFully(arrBuf, 4); + ArrowBuf slice = arrBuf.slice(4, 4); + + int n = channel.readFully(slice, 4); assertEquals(4, n); - assertEquals(8, arrBuf.writerIndex()); assertEquals(100, arrBuf.getInt(0)); assertEquals(200, arrBuf.getInt(4)); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/util/DecimalUtilityTest.java b/java/vector/src/test/java/org/apache/arrow/vector/util/DecimalUtilityTest.java index 1840b4e707a..0f42cba87de 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/util/DecimalUtilityTest.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/util/DecimalUtilityTest.java @@ -41,7 +41,6 @@ public void testSetByteArrayInDecimalArrowBuf() { ) { int [] intValues = new int [] {Integer.MAX_VALUE, Integer.MIN_VALUE, 0}; for (int val : intValues) { - buf.clear(); DecimalUtility.writeByteArrayToArrowBuf(BigInteger.valueOf(val).toByteArray(), buf, 0); BigDecimal actual = DecimalUtility.getBigDecimalFromArrowBuf(buf, 0, 0); BigDecimal expected = BigDecimal.valueOf(val); @@ -50,7 +49,6 @@ public void testSetByteArrayInDecimalArrowBuf() { long [] longValues = new long[] {Long.MIN_VALUE, 0 , Long.MAX_VALUE}; for (long val : longValues) { - buf.clear(); DecimalUtility.writeByteArrayToArrowBuf(BigInteger.valueOf(val).toByteArray(), buf, 0); BigDecimal actual = DecimalUtility.getBigDecimalFromArrowBuf(buf, 0, 0); BigDecimal expected = BigDecimal.valueOf(val); @@ -59,7 +57,6 @@ public void testSetByteArrayInDecimalArrowBuf() { BigInteger [] decimals = new BigInteger[] {MAX_BIG_INT, new BigInteger("0"), MIN_BIG_INT}; for (BigInteger val : decimals) { - buf.clear(); DecimalUtility.writeByteArrayToArrowBuf(val.toByteArray(), buf, 0); BigDecimal actual = DecimalUtility.getBigDecimalFromArrowBuf(buf, 0, 0); BigDecimal expected = new BigDecimal(val); @@ -75,7 +72,6 @@ public void testSetBigDecimalInDecimalArrowBuf() { ) { int [] intValues = new int [] {Integer.MAX_VALUE, Integer.MIN_VALUE, 0}; for (int val : intValues) { - buf.clear(); DecimalUtility.writeBigDecimalToArrowBuf(BigDecimal.valueOf(val), buf, 0); BigDecimal actual = DecimalUtility.getBigDecimalFromArrowBuf(buf, 0, 0); BigDecimal expected = BigDecimal.valueOf(val); @@ -84,7 +80,6 @@ public void testSetBigDecimalInDecimalArrowBuf() { long [] longValues = new long[] {Long.MIN_VALUE, 0 , Long.MAX_VALUE}; for (long val : longValues) { - buf.clear(); DecimalUtility.writeBigDecimalToArrowBuf(BigDecimal.valueOf(val), buf, 0); BigDecimal actual = DecimalUtility.getBigDecimalFromArrowBuf(buf, 0, 0); BigDecimal expected = BigDecimal.valueOf(val); @@ -93,7 +88,6 @@ public void testSetBigDecimalInDecimalArrowBuf() { BigInteger [] decimals = new BigInteger[] {MAX_BIG_INT, new BigInteger("0"), MIN_BIG_INT}; for (BigInteger val : decimals) { - buf.clear(); DecimalUtility.writeBigDecimalToArrowBuf(new BigDecimal(val), buf, 0); BigDecimal actual = DecimalUtility.getBigDecimalFromArrowBuf(buf, 0, 0); BigDecimal expected = new BigDecimal(val);