diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index aaa1f506fb5..687674f951b 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -22,11 +22,8 @@ import java.util.IdentityHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.arrow.memory.BaseAllocator.Verbosity; -import org.apache.arrow.memory.util.AutoCloseableLock; import org.apache.arrow.memory.util.HistoricalLog; import org.apache.arrow.util.Preconditions; @@ -73,9 +70,6 @@ public class AllocationManager { // ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap // see JIRA for details private final LowCostIdentityHashMap map = new LowCostIdentityHashMap<>(); - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock()); - private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock()); private final long amCreationTime = System.nanoTime(); private volatile BufferLedger owningLedger; @@ -115,9 +109,8 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta "A buffer can only be associated between two allocators that share the same root."); } - try (AutoCloseableLock read = readLock.open()) { - - final BufferLedger ledger = map.get(allocator); + synchronized (this) { + BufferLedger ledger = map.get(allocator); if (ledger != null) { if (retain) { ledger.inc(); @@ -125,20 +118,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta return ledger; } - } - try (AutoCloseableLock write = writeLock.open()) { - // we have to recheck existing ledger since a second reader => writer could be competing - // with us. - - final BufferLedger existingLedger = map.get(allocator); - if (existingLedger != null) { - if (retain) { - existingLedger.inc(); - } - return existingLedger; - } - - final BufferLedger ledger = new BufferLedger(allocator); + ledger = new BufferLedger(allocator); if (retain) { ledger.inc(); } @@ -153,7 +133,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta * The way that a particular BufferLedger communicates back to the AllocationManager that it * now longer needs to hold * a reference to particular piece of memory. - * Can only be called when you already hold the writeLock. + * Can only be called when you already hold the lock. */ private void release(final BufferLedger ledger) { final BaseAllocator allocator = ledger.getAllocator(); @@ -250,7 +230,7 @@ public boolean transferBalance(final BufferLedger target) { // since two balance transfers out from the allocator manager could cause incorrect // accounting, we need to ensure // that this won't happen by synchronizing on the allocator manager instance. - try (AutoCloseableLock write = writeLock.open()) { + synchronized (this) { if (owningLedger != this) { return true; } @@ -330,7 +310,7 @@ public int decrement(int decrement) { allocator.assertOpen(); final int outcome; - try (AutoCloseableLock write = writeLock.open()) { + synchronized (this) { outcome = bufRefCnt.addAndGet(-decrement); if (outcome == 0) { lDestructionTime = System.nanoTime(); @@ -431,7 +411,7 @@ public int getSize() { * @return Amount of accounted(owned) memory associated with this ledger. */ public int getAccountedSize() { - try (AutoCloseableLock read = readLock.open()) { + synchronized (this) { if (owningLedger == this) { return size; } else { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java index bc0b77a0aeb..f69a9d1754a 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java @@ -270,7 +270,7 @@ public boolean allocateNewSafe() { long curAllocationSizeValue = valueAllocationSizeInBytes; long curAllocationSizeValidity = validityAllocationSizeInBytes; - if (curAllocationSizeValue > MAX_ALLOCATION_SIZE) { + if (align(curAllocationSizeValue) + curAllocationSizeValidity > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Requested amount of memory exceeds limit"); } @@ -302,7 +302,7 @@ public void allocateNew(int valueCount) { valueBufferSize = validityBufferSize; } - if (valueBufferSize > MAX_ALLOCATION_SIZE) { + if (align(valueBufferSize) + validityBufferSize > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Requested amount of memory is more than max allowed"); } @@ -317,6 +317,13 @@ public void allocateNew(int valueCount) { } } + /* + * align to a 8-byte value. + */ + private long align(long size) { + return ((size + 7) / 8) * 8; + } + /** * Actual memory allocation is done by this function. All the calculations * and knowledge about what size to allocate is upto the callers of this @@ -327,14 +334,24 @@ public void allocateNew(int valueCount) { * conditions. */ private void allocateBytes(final long valueBufferSize, final long validityBufferSize) { - /* allocate data buffer */ - int curSize = (int) valueBufferSize; - valueBuffer = allocator.buffer(curSize); + int valueBufferSlice = (int)align(valueBufferSize); + int validityBufferSlice = (int)validityBufferSize; + + /* allocate combined buffer */ + ArrowBuf buffer = allocator.buffer(valueBufferSlice + validityBufferSlice); + + valueAllocationSizeInBytes = valueBufferSlice; + valueBuffer = buffer.slice(0, valueBufferSlice); + valueBuffer.retain(); valueBuffer.readerIndex(0); - valueAllocationSizeInBytes = curSize; - /* allocate validity buffer */ - allocateValidityBuffer((int) validityBufferSize); + + validityAllocationSizeInBytes = validityBufferSlice; + validityBuffer = buffer.slice(valueBufferSlice, validityBufferSlice); + validityBuffer.retain(); + validityBuffer.readerIndex(0); zeroVector(); + + buffer.release(); } /** @@ -422,43 +439,50 @@ public ArrowBuf[] getBuffers(boolean clear) { */ @Override public void reAlloc() { - valueBuffer = reallocBufferHelper(valueBuffer, true); - validityBuffer = reallocBufferHelper(validityBuffer, false); - } - - /** - * Helper method for reallocating a particular internal buffer - * Returns the new buffer. - */ - private ArrowBuf reallocBufferHelper(ArrowBuf buffer, final boolean dataBuffer) { - final int currentBufferCapacity = buffer.capacity(); - long baseSize = (dataBuffer ? valueAllocationSizeInBytes - : validityAllocationSizeInBytes); + int valueBaseSize = Integer.max(valueBuffer.capacity(), valueAllocationSizeInBytes); + long newValueBufferSlice = align(valueBaseSize * 2L); + long newValidityBufferSlice; + if (typeWidth > 0) { + long targetValueBufferSize = align(BaseAllocator.nextPowerOfTwo(newValueBufferSlice)); + long targetValueCount = targetValueBufferSize / typeWidth; + targetValueBufferSize -= getValidityBufferSizeFromCount((int) targetValueCount); + if (newValueBufferSlice < targetValueBufferSize) { + newValueBufferSlice = targetValueBufferSize; + } - if (baseSize < (long) currentBufferCapacity) { - baseSize = (long) currentBufferCapacity; + newValidityBufferSlice = getValidityBufferSizeFromCount((int)(newValueBufferSlice / typeWidth)); + } else { + newValidityBufferSlice = newValueBufferSlice; } - long newAllocationSize = baseSize * 2L; - newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize); + long newAllocationSize = newValueBufferSlice + newValidityBufferSlice; assert newAllocationSize >= 1; if (newAllocationSize > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Unable to expand the buffer"); } - final ArrowBuf newBuf = allocator.buffer((int) newAllocationSize); - newBuf.setBytes(0, buffer, 0, currentBufferCapacity); - newBuf.setZero(currentBufferCapacity, newBuf.capacity() - currentBufferCapacity); - buffer.release(1); - buffer = newBuf; - if (dataBuffer) { - valueAllocationSizeInBytes = (int) newAllocationSize; - } else { - validityAllocationSizeInBytes = (int) newAllocationSize; - } + final ArrowBuf newBuffer = allocator.buffer((int) newAllocationSize); + final ArrowBuf newValueBuffer = newBuffer.slice(0, (int)newValueBufferSlice); + newValueBuffer.setBytes(0, valueBuffer, 0, valueBuffer.capacity()); + newValueBuffer.setZero(valueBuffer.capacity(), (int)newValueBufferSlice - valueBuffer.capacity()); + newValueBuffer.retain(); + newValueBuffer.readerIndex(0); + valueBuffer.release(); + valueBuffer = newValueBuffer; + valueAllocationSizeInBytes = (int)newValueBufferSlice; + + final ArrowBuf newValidityBuffer = newBuffer.slice((int)newValueBufferSlice, + (int)newValidityBufferSlice); + newValidityBuffer.setBytes(0, validityBuffer, 0, validityBuffer.capacity()); + newValidityBuffer.setZero(validityBuffer.capacity(), (int)newValidityBufferSlice - validityBuffer.capacity()); + newValidityBuffer.retain(); + newValidityBuffer.readerIndex(0); + validityBuffer.release(); + validityBuffer = newValidityBuffer; + validityAllocationSizeInBytes = (int)newValidityBufferSlice; - return buffer; + newBuffer.release(); } @Override diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java b/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java index 48bc8936d9f..9165343bfdc 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestBufferOwnershipTransfer.java @@ -40,15 +40,14 @@ public void testTransferFixedWidth() { IntVector v1 = new IntVector("v1", childAllocator1); v1.allocateNew(); v1.setValueCount(4095); + long totalAllocatedMemory = childAllocator1.getAllocatedMemory(); IntVector v2 = new IntVector("v2", childAllocator2); v1.makeTransferPair(v2).transfer(); assertEquals(0, childAllocator1.getAllocatedMemory()); - int expectedBitVector = 512; - int expectedValueVector = 4096 * 4; - assertEquals(expectedBitVector + expectedValueVector, childAllocator2.getAllocatedMemory()); + assertEquals(totalAllocatedMemory, childAllocator2.getAllocatedMemory()); } @Test diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java index 4e8d8f0f399..68102b1c32a 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestListVector.java @@ -774,13 +774,13 @@ public void testSetInitialCapacity() { vector.setInitialCapacity(512); vector.allocateNew(); assertEquals(512, vector.getValueCapacity()); - assertEquals(4096, vector.getDataVector().getValueCapacity()); + assertTrue(vector.getDataVector().getValueCapacity() >= 512 * 5); /* use density as 4 */ vector.setInitialCapacity(512, 4); vector.allocateNew(); assertEquals(512, vector.getValueCapacity()); - assertEquals(512 * 4, vector.getDataVector().getValueCapacity()); + assertTrue(vector.getDataVector().getValueCapacity() >= 512 * 4); /** * inner value capacity we pass to data vector is 512 * 0.1 => 51 @@ -793,7 +793,7 @@ public void testSetInitialCapacity() { vector.setInitialCapacity(512, 0.1); vector.allocateNew(); assertEquals(512, vector.getValueCapacity()); - assertEquals(64, vector.getDataVector().getValueCapacity()); + assertTrue(vector.getDataVector().getValueCapacity() >= 51); /** * inner value capacity we pass to data vector is 512 * 0.01 => 5 @@ -806,7 +806,7 @@ public void testSetInitialCapacity() { vector.setInitialCapacity(512, 0.01); vector.allocateNew(); assertEquals(512, vector.getValueCapacity()); - assertEquals(8, vector.getDataVector().getValueCapacity()); + assertTrue(vector.getDataVector().getValueCapacity() >= 5); /** * inner value capacity we pass to data vector is 5 * 0.1 => 0 @@ -822,7 +822,7 @@ public void testSetInitialCapacity() { vector.setInitialCapacity(5, 0.1); vector.allocateNew(); assertEquals(7, vector.getValueCapacity()); - assertEquals(1, vector.getDataVector().getValueCapacity()); + assertTrue(vector.getDataVector().getValueCapacity() >= 1); } }