From 85620bd689dcea3676e03228bb9be9fc6b06d2ef Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 15 Jul 2020 15:11:37 +0800 Subject: [PATCH] ARROW-9475: [Java] Clean up usages of BaseAllocator, use BufferAllocator instead --- .../org/apache/arrow/memory/Accountant.java | 3 +- .../arrow/memory/AllocationManager.java | 39 +++++++++++-------- .../apache/arrow/memory/BaseAllocator.java | 16 +++++--- .../apache/arrow/memory/BufferAllocator.java | 32 +++++++++++++++ .../org/apache/arrow/memory/BufferLedger.java | 22 +++++------ .../DefaultAllocationManagerFactory.java | 2 +- .../DefaultAllocationManagerFactory.java | 2 +- .../arrow/memory/NettyAllocationManager.java | 6 +-- .../arrow/memory/TestBaseAllocator.java | 2 +- .../memory/TestNettyAllocationManager.java | 2 +- .../DefaultAllocationManagerFactory.java | 2 +- .../arrow/memory/UnsafeAllocationManager.java | 4 +- 12 files changed, 87 insertions(+), 45 deletions(-) diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java index da93511b4f2..42dac7b8c60 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/Accountant.java @@ -140,7 +140,7 @@ private void updatePeak() { * @param size to increase * @return Whether the allocation fit within limits. */ - boolean forceAllocate(long size) { + public boolean forceAllocate(long size) { final AllocationOutcome.Status outcome = allocate(size, true, true, null); return outcome.isOk(); } @@ -220,7 +220,6 @@ public void releaseBytes(long size) { final long actualToReleaseToParent = Math.min(size, possibleAmountToReleaseToParent); parent.releaseBytes(actualToReleaseToParent); } - } public boolean isOverLimit() { diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java index c61d041097e..9c7cfa9d90d 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -47,11 +47,11 @@ public abstract class AllocationManager { private static final AtomicLong MANAGER_ID_GENERATOR = new AtomicLong(0); - private final RootAllocator root; + private final BufferAllocator root; private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet(); // ARROW-1627 Trying to minimize memory overhead caused by previously used IdentityHashMap // see JIRA for details - private final LowCostIdentityHashMap map = new LowCostIdentityHashMap<>(); + private final LowCostIdentityHashMap map = new LowCostIdentityHashMap<>(); private final long amCreationTime = System.nanoTime(); // The ReferenceManager created at the time of creation of this AllocationManager @@ -60,11 +60,11 @@ public abstract class AllocationManager { private volatile BufferLedger owningLedger; private volatile long amDestructionTime = 0; - protected AllocationManager(BaseAllocator accountingAllocator) { + protected AllocationManager(BufferAllocator accountingAllocator) { Preconditions.checkNotNull(accountingAllocator); accountingAllocator.assertOpen(); - this.root = accountingAllocator.root; + this.root = accountingAllocator.getRoot(); // we do a no retain association since our creator will want to retrieve the newly created // ledger and will create a reference count at that point @@ -87,13 +87,13 @@ void setOwningLedger(final BufferLedger ledger) { * @return The reference manager (new or existing) that associates the underlying * buffer to this new ledger. */ - BufferLedger associate(final BaseAllocator allocator) { + BufferLedger associate(final BufferAllocator allocator) { return associate(allocator, true); } - private BufferLedger associate(final BaseAllocator allocator, final boolean retain) { + private BufferLedger associate(final BufferAllocator allocator, final boolean retain) { allocator.assertOpen(); - Preconditions.checkState(root == allocator.root, + Preconditions.checkState(root == allocator.getRoot(), "A buffer can only be associated between two allocators that share the same root"); synchronized (this) { @@ -118,9 +118,11 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta Preconditions.checkState(oldLedger == null, "Detected inconsistent state: A reference manager already exists for this allocator"); - // needed for debugging only: keep a pointer to reference manager inside allocator - // to dump state, verify allocator state etc - allocator.associateLedger(ledger); + if (allocator instanceof BaseAllocator) { + // needed for debugging only: keep a pointer to reference manager inside allocator + // to dump state, verify allocator state etc + ((BaseAllocator) allocator).associateLedger(ledger); + } return ledger; } } @@ -133,7 +135,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta * calling ReferenceManager drops to 0. */ void release(final BufferLedger ledger) { - final BaseAllocator allocator = (BaseAllocator) ledger.getAllocator(); + final BufferAllocator allocator = ledger.getAllocator(); allocator.assertOpen(); // remove the mapping for the allocator @@ -142,9 +144,12 @@ void release(final BufferLedger ledger) { "Expecting a mapping for allocator and reference manager"); final BufferLedger oldLedger = map.remove(allocator); - // needed for debug only: tell the allocator that AllocationManager is removing a - // reference manager associated with this particular allocator - ((BaseAllocator) oldLedger.getAllocator()).dissociateLedger(oldLedger); + BufferAllocator oldAllocator = oldLedger.getAllocator(); + if (oldAllocator instanceof BaseAllocator) { + // needed for debug only: tell the allocator that AllocationManager is removing a + // reference manager associated with this particular allocator + ((BaseAllocator) oldAllocator).dissociateLedger(oldLedger); + } if (oldLedger == owningLedger) { // the release call was made by the owning reference manager @@ -152,10 +157,10 @@ void release(final BufferLedger ledger) { // the only mapping was for the owner // which now has been removed, it implies we can safely destroy the // underlying memory chunk as it is no longer being referenced - ((BaseAllocator) oldLedger.getAllocator()).releaseBytes(getSize()); + oldAllocator.releaseBytes(getSize()); // free the memory chunk associated with the allocation manager release0(); - ((BaseAllocator) oldLedger.getAllocator()).getListener().onRelease(getSize()); + oldAllocator.getListener().onRelease(getSize()); amDestructionTime = System.nanoTime(); owningLedger = null; } else { @@ -209,7 +214,7 @@ public interface Factory { * @param size Size (in bytes) of memory managed by the AllocationManager * @return The created AllocationManager used by this allocator */ - AllocationManager create(BaseAllocator accountingAllocator, long size); + AllocationManager create(BufferAllocator accountingAllocator, long size); ArrowBuf empty(); } diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 81f664985d5..246b2212e26 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -61,10 +61,10 @@ abstract class BaseAllocator extends Accountant implements BufferAllocator { public static final Config DEFAULT_CONFIG = ImmutableConfig.builder().build(); // Package exposed for sharing between AllocatorManger and BaseAllocator objects - final String name; - final RootAllocator root; + private final String name; + private final RootAllocator root; private final Object DEBUG_LOCK = DEBUG ? new Object() : null; - final AllocationListener listener; + private final AllocationListener listener; private final BaseAllocator parentAllocator; private final Map childAllocators; private final ArrowBuf empty; @@ -124,7 +124,8 @@ protected BaseAllocator( this.roundingPolicy = config.getRoundingPolicy(); } - AllocationListener getListener() { + @Override + public AllocationListener getListener() { return listener; } @@ -314,6 +315,11 @@ private AllocationManager newAllocationManager(BaseAllocator accountingAllocator return allocationManagerFactory.create(accountingAllocator, size); } + @Override + public BufferAllocator getRoot() { + return root; + } + @Override public BufferAllocator newChildAllocator( final String name, @@ -343,7 +349,7 @@ public BufferAllocator newChildAllocator( synchronized (DEBUG_LOCK) { childAllocators.put(childAllocator, childAllocator); historicalLog.recordEvent("allocator[%s] created new child allocator[%s]", name, - childAllocator.name); + childAllocator.getName()); } } else { childAllocators.put(childAllocator, childAllocator); diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java index aa1f856c591..8fbf6f7b073 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferAllocator.java @@ -49,6 +49,14 @@ public interface BufferAllocator extends AutoCloseable { */ ArrowBuf buffer(long size, BufferManager manager); + /** + * Get the root allocator of this allocator. If this allocator is already a root, return + * this directly. + * + * @return The root allocator + */ + BufferAllocator getRoot(); + /** * Create a new child allocator. * @@ -126,6 +134,30 @@ BufferAllocator newChildAllocator( */ long getHeadroom(); + /** + * Forcibly allocate bytes. Returns whether the allocation fit within limits. + * + * @param size to increase + * @return Whether the allocation fit within limits. + */ + boolean forceAllocate(long size); + + + /** + * Release bytes from this allocator. + * + * @param size to release + */ + void releaseBytes(long size); + + /** + * Returns the allocation listener used by this allocator. + * + * @return the {@link AllocationListener} instance. Or {@link AllocationListener#NOOP} by default if no listener + * is configured when this allocator was created. + */ + AllocationListener getListener(); + /** * Returns the parent allocator. * diff --git a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java index 9fa4de71d8d..48b3e183d5a 100644 --- a/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/memory-core/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -31,7 +31,7 @@ * ArrowBufs managed by this reference manager share a common * fate (same reference count). */ -public class BufferLedger implements ValueWithKeyIncluded, ReferenceManager { +public class BufferLedger implements ValueWithKeyIncluded, ReferenceManager { private final IdentityHashMap buffers = BaseAllocator.DEBUG ? new IdentityHashMap<>() : null; private static final AtomicLong LEDGER_ID_GENERATOR = new AtomicLong(0); @@ -41,14 +41,14 @@ public class BufferLedger implements ValueWithKeyIncluded, Refere // manage request for retain // correctly private final long lCreationTime = System.nanoTime(); - private final BaseAllocator allocator; + private final BufferAllocator allocator; private final AllocationManager allocationManager; private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog(BaseAllocator.DEBUG_LOG_LENGTH, "BufferLedger[%d]", 1) : null; private volatile long lDestructionTime = 0; - BufferLedger(final BaseAllocator allocator, final AllocationManager allocationManager) { + BufferLedger(final BufferAllocator allocator, final AllocationManager allocationManager) { this.allocator = allocator; this.allocationManager = allocationManager; } @@ -57,7 +57,7 @@ boolean isOwningLedger() { return this == allocationManager.getOwningLedger(); } - public BaseAllocator getKey() { + public BufferAllocator getKey() { return allocator; } @@ -238,7 +238,7 @@ public ArrowBuf deriveBuffer(final ArrowBuf sourceBuffer, long index, long lengt "ArrowBuf(BufferLedger, BufferAllocator[%s], " + "UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d", - allocator.name, System.identityHashCode(derivedBuf), derivedBuf.toString(), + allocator.getName(), System.identityHashCode(derivedBuf), derivedBuf.toString(), System.identityHashCode(this)); synchronized (buffers) { @@ -275,7 +275,7 @@ ArrowBuf newArrowBuf(final long length, final BufferManager manager) { historicalLog.recordEvent( "ArrowBuf(BufferLedger, BufferAllocator[%s], " + "UnsafeDirectLittleEndian[identityHashCode == " + "%d](%s)) => ledger hc == %d", - allocator.name, System.identityHashCode(buf), buf.toString(), + allocator.getName(), System.identityHashCode(buf), buf.toString(), System.identityHashCode(this)); synchronized (buffers) { @@ -317,7 +317,7 @@ public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) { // alternatively, if there was already a mapping for in // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1 // and this will be true for all the existing buffers currently managed by targetrefmanager - final BufferLedger targetRefManager = allocationManager.associate((BaseAllocator) target); + final BufferLedger targetRefManager = allocationManager.associate(target); // create a new ArrowBuf to associate with new allocator and target ref manager final long targetBufLength = srcBuffer.capacity(); ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength); @@ -336,8 +336,8 @@ public ArrowBuf retain(final ArrowBuf srcBuffer, BufferAllocator target) { boolean transferBalance(final ReferenceManager targetReferenceManager) { Preconditions.checkArgument(targetReferenceManager != null, "Expecting valid target reference manager"); - final BaseAllocator targetAllocator = (BaseAllocator) targetReferenceManager.getAllocator(); - Preconditions.checkArgument(allocator.root == targetAllocator.root, + final BufferAllocator targetAllocator = targetReferenceManager.getAllocator(); + Preconditions.checkArgument(allocator.getRoot() == targetAllocator.getRoot(), "You can only transfer between two allocators that share the same root."); allocator.assertOpen(); @@ -411,7 +411,7 @@ public TransferResult transferOwnership(final ArrowBuf srcBuffer, final BufferAl // alternatively, if there was already a mapping for in // allocation manager, the ref count of the new buffer will be targetrefmanager.refcount() + 1 // and this will be true for all the existing buffers currently managed by targetrefmanager - final BufferLedger targetRefManager = allocationManager.associate((BaseAllocator) target); + final BufferLedger targetRefManager = allocationManager.associate(target); // create a new ArrowBuf to associate with new allocator and target ref manager final long targetBufLength = srcBuffer.capacity(); final ArrowBuf targetArrowBuf = targetRefManager.deriveBuffer(srcBuffer, 0, targetBufLength); @@ -486,7 +486,7 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) { .append("ledger[") .append(ledgerId) .append("] allocator: ") - .append(allocator.name) + .append(allocator.getName()) .append("), isOwning: ") .append(", size: ") .append(", references: ") diff --git a/java/memory/memory-core/src/test/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java b/java/memory/memory-core/src/test/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java index e4553104715..bfe496532b1 100644 --- a/java/memory/memory-core/src/test/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java +++ b/java/memory/memory-core/src/test/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java @@ -34,7 +34,7 @@ public class DefaultAllocationManagerFactory implements AllocationManager.Factor MemoryUtil.UNSAFE.allocateMemory(0)); @Override - public AllocationManager create(BaseAllocator accountingAllocator, long size) { + public AllocationManager create(BufferAllocator accountingAllocator, long size) { return new AllocationManager(accountingAllocator) { private final long allocatedSize = size; private final long address = MemoryUtil.UNSAFE.allocateMemory(size); diff --git a/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java b/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java index 15651a38e4a..10cfb5c1648 100644 --- a/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java +++ b/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java @@ -26,7 +26,7 @@ public class DefaultAllocationManagerFactory implements AllocationManager.Factor public static final AllocationManager.Factory FACTORY = NettyAllocationManager.FACTORY; @Override - public AllocationManager create(BaseAllocator accountingAllocator, long size) { + public AllocationManager create(BufferAllocator accountingAllocator, long size) { return FACTORY.create(accountingAllocator, size); } diff --git a/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java b/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java index 45bd5d91347..20004778307 100644 --- a/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java +++ b/java/memory/memory-netty/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java @@ -30,7 +30,7 @@ public class NettyAllocationManager extends AllocationManager { public static final AllocationManager.Factory FACTORY = new AllocationManager.Factory() { @Override - public AllocationManager create(BaseAllocator accountingAllocator, long size) { + public AllocationManager create(BufferAllocator accountingAllocator, long size) { return new NettyAllocationManager(accountingAllocator, size); } @@ -65,7 +65,7 @@ public ArrowBuf empty() { */ private final int allocationCutOffValue; - NettyAllocationManager(BaseAllocator accountingAllocator, long requestedSize, int allocationCutOffValue) { + NettyAllocationManager(BufferAllocator accountingAllocator, long requestedSize, int allocationCutOffValue) { super(accountingAllocator); this.allocationCutOffValue = allocationCutOffValue; @@ -80,7 +80,7 @@ public ArrowBuf empty() { } } - NettyAllocationManager(BaseAllocator accountingAllocator, long requestedSize) { + NettyAllocationManager(BufferAllocator accountingAllocator, long requestedSize) { this(accountingAllocator, requestedSize, DEFAULT_ALLOCATION_CUTOFF_VALUE); } diff --git a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index a42e272a42e..ef49e41785f 100644 --- a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -393,7 +393,7 @@ private BaseAllocator createAllocatorWithCustomizedAllocationManager() { .maxAllocation(MAX_ALLOCATION) .allocationManagerFactory(new AllocationManager.Factory() { @Override - public AllocationManager create(BaseAllocator accountingAllocator, long requestedSize) { + public AllocationManager create(BufferAllocator accountingAllocator, long requestedSize) { return new AllocationManager(accountingAllocator) { private final Unsafe unsafe = getUnsafe(); private final long address = unsafe.allocateMemory(requestedSize); diff --git a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java index f386ea66b2a..1b64cd73363 100644 --- a/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java +++ b/java/memory/memory-netty/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java @@ -35,7 +35,7 @@ private BaseAllocator createCustomizedAllocator() { return new RootAllocator(BaseAllocator.configBuilder() .allocationManagerFactory(new AllocationManager.Factory() { @Override - public AllocationManager create(BaseAllocator accountingAllocator, long size) { + public AllocationManager create(BufferAllocator accountingAllocator, long size) { return new NettyAllocationManager(accountingAllocator, size, CUSTOMIZED_ALLOCATION_CUTOFF_VALUE); } diff --git a/java/memory/memory-unsafe/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java b/java/memory/memory-unsafe/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java index 3963c1875d0..720c3d02d23 100644 --- a/java/memory/memory-unsafe/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java +++ b/java/memory/memory-unsafe/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java @@ -26,7 +26,7 @@ public class DefaultAllocationManagerFactory implements AllocationManager.Factor public static final AllocationManager.Factory FACTORY = UnsafeAllocationManager.FACTORY; @Override - public AllocationManager create(BaseAllocator accountingAllocator, long size) { + public AllocationManager create(BufferAllocator accountingAllocator, long size) { return FACTORY.create(accountingAllocator, size); } diff --git a/java/memory/memory-unsafe/src/main/java/org/apache/arrow/memory/UnsafeAllocationManager.java b/java/memory/memory-unsafe/src/main/java/org/apache/arrow/memory/UnsafeAllocationManager.java index f9756539c55..b10aba3598d 100644 --- a/java/memory/memory-unsafe/src/main/java/org/apache/arrow/memory/UnsafeAllocationManager.java +++ b/java/memory/memory-unsafe/src/main/java/org/apache/arrow/memory/UnsafeAllocationManager.java @@ -32,7 +32,7 @@ public final class UnsafeAllocationManager extends AllocationManager { public static final AllocationManager.Factory FACTORY = new Factory() { @Override - public AllocationManager create(BaseAllocator accountingAllocator, long size) { + public AllocationManager create(BufferAllocator accountingAllocator, long size) { return new UnsafeAllocationManager(accountingAllocator, size); } @@ -46,7 +46,7 @@ public ArrowBuf empty() { private final long allocatedAddress; - UnsafeAllocationManager(BaseAllocator accountingAllocator, long requestedSize) { + UnsafeAllocationManager(BufferAllocator accountingAllocator, long requestedSize) { super(accountingAllocator); allocatedAddress = MemoryUtil.UNSAFE.allocateMemory(requestedSize); allocatedSize = requestedSize;