Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 42 additions & 41 deletions java/memory/src/main/java/io/netty/buffer/ArrowBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@
import org.apache.arrow.memory.BufferManager;
import org.apache.arrow.memory.ReferenceManager;
import org.apache.arrow.memory.util.HistoricalLog;
import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.Preconditions;

import io.netty.util.internal.PlatformDependent;

/**
* ArrowBuf serves as a facade over underlying memory by providing
* several access APIs to read/write data into a chunk of direct
Expand Down Expand Up @@ -349,7 +348,7 @@ private void checkIndexD(long index, long fieldLength) {
*/
public long getLong(long index) {
chk(index, LONG_SIZE);
return PlatformDependent.getLong(addr(index));
return MemoryUtil.UNSAFE.getLong(addr(index));
}

/**
Expand All @@ -361,7 +360,7 @@ public long getLong(long index) {
*/
public void setLong(long index, long value) {
chk(index, LONG_SIZE);
PlatformDependent.putLong(addr(index), value);
MemoryUtil.UNSAFE.putLong(addr(index), value);
}

/**
Expand All @@ -384,7 +383,7 @@ public float getFloat(long index) {
*/
public void setFloat(long index, float value) {
chk(index, FLOAT_SIZE);
PlatformDependent.putInt(addr(index), Float.floatToRawIntBits(value));
MemoryUtil.UNSAFE.putInt(addr(index), Float.floatToRawIntBits(value));
}

/**
Expand All @@ -407,7 +406,7 @@ public double getDouble(long index) {
*/
public void setDouble(long index, double value) {
chk(index, DOUBLE_SIZE);
PlatformDependent.putLong(addr(index), Double.doubleToRawLongBits(value));
MemoryUtil.UNSAFE.putLong(addr(index), Double.doubleToRawLongBits(value));
}

/**
Expand All @@ -430,7 +429,7 @@ public char getChar(long index) {
*/
public void setChar(long index, int value) {
chk(index, SHORT_SIZE);
PlatformDependent.putShort(addr(index), (short) value);
MemoryUtil.UNSAFE.putShort(addr(index), (short) value);
}

/**
Expand All @@ -442,7 +441,7 @@ public void setChar(long index, int value) {
*/
public int getInt(long index) {
chk(index, INT_SIZE);
return PlatformDependent.getInt(addr(index));
return MemoryUtil.UNSAFE.getInt(addr(index));
}

/**
Expand All @@ -454,7 +453,7 @@ public int getInt(long index) {
*/
public void setInt(long index, int value) {
chk(index, INT_SIZE);
PlatformDependent.putInt(addr(index), value);
MemoryUtil.UNSAFE.putInt(addr(index), value);
}

/**
Expand All @@ -466,7 +465,7 @@ public void setInt(long index, int value) {
*/
public short getShort(long index) {
chk(index, SHORT_SIZE);
return PlatformDependent.getShort(addr(index));
return MemoryUtil.UNSAFE.getShort(addr(index));
}

/**
Expand All @@ -489,7 +488,7 @@ public void setShort(long index, int value) {
*/
public void setShort(long index, short value) {
chk(index, SHORT_SIZE);
PlatformDependent.putShort(addr(index), value);
MemoryUtil.UNSAFE.putShort(addr(index), value);
}

/**
Expand All @@ -501,7 +500,7 @@ public void setShort(long index, short value) {
*/
public void setByte(long index, int value) {
chk(index, 1);
PlatformDependent.putByte(addr(index), (byte) value);
MemoryUtil.UNSAFE.putByte(addr(index), (byte) value);
}

/**
Expand All @@ -513,7 +512,7 @@ public void setByte(long index, int value) {
*/
public void setByte(long index, byte value) {
chk(index, 1);
PlatformDependent.putByte(addr(index), value);
MemoryUtil.UNSAFE.putByte(addr(index), value);
}

/**
Expand All @@ -525,7 +524,7 @@ public void setByte(long index, byte value) {
*/
public byte getByte(long index) {
chk(index, 1);
return PlatformDependent.getByte(addr(index));
return MemoryUtil.UNSAFE.getByte(addr(index));
}


Expand Down Expand Up @@ -603,7 +602,7 @@ public void readBytes(byte[] dst) {
*/
public void writeByte(byte value) {
ensureWritable(1);
PlatformDependent.putByte(addr(writerIndex), value);
MemoryUtil.UNSAFE.putByte(addr(writerIndex), value);
++writerIndex;
}

Expand All @@ -614,7 +613,7 @@ public void writeByte(byte value) {
*/
public void writeByte(int value) {
ensureWritable(1);
PlatformDependent.putByte(addr(writerIndex), (byte)value);
MemoryUtil.UNSAFE.putByte(addr(writerIndex), (byte)value);
++writerIndex;
}

Expand Down Expand Up @@ -647,7 +646,7 @@ public void writeBytes(byte[] src, int srcIndex, int length) {
*/
public void writeShort(int value) {
ensureWritable(SHORT_SIZE);
PlatformDependent.putShort(addr(writerIndex), (short) value);
MemoryUtil.UNSAFE.putShort(addr(writerIndex), (short) value);
writerIndex += SHORT_SIZE;
}

Expand All @@ -657,7 +656,7 @@ public void writeShort(int value) {
*/
public void writeInt(int value) {
ensureWritable(INT_SIZE);
PlatformDependent.putInt(addr(writerIndex), value);
MemoryUtil.UNSAFE.putInt(addr(writerIndex), value);
writerIndex += INT_SIZE;
}

Expand All @@ -667,7 +666,7 @@ public void writeInt(int value) {
*/
public void writeLong(long value) {
ensureWritable(LONG_SIZE);
PlatformDependent.putLong(addr(writerIndex), value);
MemoryUtil.UNSAFE.putLong(addr(writerIndex), value);
writerIndex += LONG_SIZE;
}

Expand All @@ -677,7 +676,7 @@ public void writeLong(long value) {
*/
public void writeFloat(float value) {
ensureWritable(FLOAT_SIZE);
PlatformDependent.putInt(addr(writerIndex), Float.floatToRawIntBits(value));
MemoryUtil.UNSAFE.putInt(addr(writerIndex), Float.floatToRawIntBits(value));
writerIndex += FLOAT_SIZE;
}

Expand All @@ -687,7 +686,7 @@ public void writeFloat(float value) {
*/
public void writeDouble(double value) {
ensureWritable(DOUBLE_SIZE);
PlatformDependent.putLong(addr(writerIndex), Double.doubleToRawLongBits(value));
MemoryUtil.UNSAFE.putLong(addr(writerIndex), Double.doubleToRawLongBits(value));
writerIndex += DOUBLE_SIZE;
}

Expand Down Expand Up @@ -753,7 +752,7 @@ public void getBytes(long index, byte[] dst, int dstIndex, int length) {
if (length != 0) {
// copy "length" bytes from this ArrowBuf starting at addr(index) address
// into dst byte array at dstIndex onwards
PlatformDependent.copyMemory(addr(index), dst, dstIndex, (long)length);
MemoryUtil.UNSAFE.copyMemory(null, addr(index), dst, MemoryUtil.BYTE_ARRAY_BASE_OFFSET + dstIndex, length);
}
}

Expand Down Expand Up @@ -790,7 +789,7 @@ public void setBytes(long index, byte[] src, int srcIndex, long length) {
if (length > 0) {
// copy "length" bytes from src byte array at the starting index (srcIndex)
// into this ArrowBuf starting at address "addr(index)"
PlatformDependent.copyMemory(src, srcIndex, addr(index), length);
MemoryUtil.UNSAFE.copyMemory(src, MemoryUtil.BYTE_ARRAY_BASE_OFFSET + srcIndex, null, addr(index), length);
}
}

Expand All @@ -815,16 +814,17 @@ public void getBytes(long index, ByteBuffer dst) {
// copy dst.remaining() bytes of data from this ArrowBuf starting
// at address srcAddress into the dst ByteBuffer starting at
// address dstAddress
final long dstAddress = PlatformDependent.directBufferAddress(dst) + (long)dst.position();
PlatformDependent.copyMemory(srcAddress, dstAddress, (long)dst.remaining());
final long dstAddress = MemoryUtil.getByteBufferAddress(dst) + dst.position();
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, dst.remaining());
// after copy, bump the next write position for the dst ByteBuffer
dst.position(dst.position() + dst.remaining());
} else if (dst.hasArray()) {
// copy dst.remaining() bytes of data from this ArrowBuf starting
// at address srcAddress into the dst ByteBuffer starting at
// index dstIndex
final int dstIndex = dst.arrayOffset() + dst.position();
PlatformDependent.copyMemory(srcAddress, dst.array(), dstIndex, (long)dst.remaining());
MemoryUtil.UNSAFE.copyMemory(
null, srcAddress, dst.array(), MemoryUtil.BYTE_ARRAY_BASE_OFFSET + dstIndex, dst.remaining());
// after copy, bump the next write position for the dst ByteBuffer
dst.position(dst.position() + dst.remaining());
} else {
Expand All @@ -851,34 +851,35 @@ public void setBytes(long index, ByteBuffer src) {
if (src.isDirect()) {
// copy src.remaining() bytes of data from src ByteBuffer starting at
// address srcAddress into this ArrowBuf starting at address dstAddress
final long srcAddress = PlatformDependent.directBufferAddress(src) + (long)src.position();
PlatformDependent.copyMemory(srcAddress, dstAddress, (long)length);
final long srcAddress = MemoryUtil.getByteBufferAddress(src) + src.position();
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length);
// after copy, bump the next read position for the src ByteBuffer
src.position(src.position() + length);
} else if (src.hasArray()) {
// copy src.remaining() bytes of data from src ByteBuffer starting at
// index srcIndex into this ArrowBuf starting at address dstAddress
final int srcIndex = src.arrayOffset() + src.position();
PlatformDependent.copyMemory(src.array(), srcIndex, dstAddress, (long)length);
MemoryUtil.UNSAFE.copyMemory(
src.array(), MemoryUtil.BYTE_ARRAY_BASE_OFFSET + srcIndex, null, dstAddress, length);
// after copy, bump the next read position for the src ByteBuffer
src.position(src.position() + length);
} else {
// copy word at a time
while (length - 128 >= LONG_SIZE) {
for (int x = 0; x < 16; x++) {
PlatformDependent.putLong(dstAddress, src.getLong());
MemoryUtil.UNSAFE.putLong(dstAddress, src.getLong());
length -= LONG_SIZE;
dstAddress += LONG_SIZE;
}
}
while (length >= LONG_SIZE) {
PlatformDependent.putLong(dstAddress, src.getLong());
MemoryUtil.UNSAFE.putLong(dstAddress, src.getLong());
length -= LONG_SIZE;
dstAddress += LONG_SIZE;
}
// copy last byte
while (length > 0) {
PlatformDependent.putByte(dstAddress, src.get());
MemoryUtil.UNSAFE.putByte(dstAddress, src.get());
--length;
++dstAddress;
}
Expand All @@ -903,9 +904,9 @@ public void setBytes(long index, ByteBuffer src, int srcIndex, int length) {
if (src.isDirect()) {
// copy length bytes of data from src ByteBuffer starting at address
// srcAddress into this ArrowBuf at address dstAddress
final long srcAddress = PlatformDependent.directBufferAddress(src) + srcIndex;
final long srcAddress = MemoryUtil.getByteBufferAddress(src) + srcIndex;
final long dstAddress = addr(index);
PlatformDependent.copyMemory(srcAddress, dstAddress, length);
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length);
} else {
if (srcIndex == 0 && src.capacity() == length) {
// copy the entire ByteBuffer from start to end of length
Expand Down Expand Up @@ -945,7 +946,7 @@ public void getBytes(long index, ArrowBuf dst, int dstIndex, int length) {
// dstAddress
final long srcAddress = addr(index);
final long dstAddress = dst.memoryAddress() + (long)dstIndex;
PlatformDependent.copyMemory(srcAddress, dstAddress, (long)length);
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length);
}
}

Expand Down Expand Up @@ -975,7 +976,7 @@ public void setBytes(long index, ArrowBuf src, long srcIndex, long length) {
// dstAddress
final long srcAddress = src.memoryAddress() + srcIndex;
final long dstAddress = addr(index);
PlatformDependent.copyMemory(srcAddress, dstAddress, length);
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length);
}
}

Expand All @@ -995,7 +996,7 @@ public void setBytes(long index, ArrowBuf src) {
checkIndex(index, length);
final long srcAddress = src.memoryAddress() + src.readerIndex;
final long dstAddress = addr(index);
PlatformDependent.copyMemory(srcAddress, dstAddress, length);
MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length);
src.readerIndex(src.readerIndex + length);
}

Expand All @@ -1020,7 +1021,7 @@ public int setBytes(long index, InputStream in, int length) throws IOException {
if (readBytes > 0) {
// copy readBytes length of data from the tmp byte array starting
// at srcIndex 0 into this ArrowBuf starting at address addr(index)
PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes);
MemoryUtil.UNSAFE.copyMemory(tmp, MemoryUtil.BYTE_ARRAY_BASE_OFFSET, null, addr(index), readBytes);
}
}
return readBytes;
Expand All @@ -1042,7 +1043,7 @@ public void getBytes(long index, OutputStream out, int length) throws IOExceptio
// copy length bytes of data from this ArrowBuf starting at
// address addr(index) into the tmp byte array starting at index 0
byte[] tmp = new byte[length];
PlatformDependent.copyMemory(addr(index), tmp, 0, length);
MemoryUtil.UNSAFE.copyMemory(null, addr(index), tmp, MemoryUtil.BYTE_ARRAY_BASE_OFFSET, length);
// write the copied data to output stream
out.write(tmp);
}
Expand Down Expand Up @@ -1170,7 +1171,7 @@ public ArrowBuf writerIndex(long writerIndex) {
public ArrowBuf setZero(long index, long length) {
if (length != 0) {
this.checkIndex(index, length);
PlatformDependent.setMemory(this.addr + index, length, (byte) 0);
MemoryUtil.UNSAFE.setMemory(this.addr + index, length, (byte) 0);
}
return this;
}
Expand All @@ -1185,7 +1186,7 @@ public ArrowBuf setZero(long index, long length) {
public ArrowBuf setOne(int index, int length) {
if (length != 0) {
this.checkIndex(index, length);
PlatformDependent.setMemory(this.addr + index, length, (byte) 0xff);
MemoryUtil.UNSAFE.setMemory(this.addr + index, length, (byte) 0xff);
}
return this;
}
Expand Down
Loading